HDFS-8652. Track BlockInfo instead of Block in CorruptReplicasMap. Contributed by Jing Zhao.

This commit is contained in:
Jing Zhao 2015-07-06 15:54:07 -07:00
parent 47a69ec7a5
commit d62b63d297
17 changed files with 169 additions and 168 deletions

View File

@ -701,6 +701,8 @@ Release 2.8.0 - UNRELEASED
HDFS-8709. Clarify automatic sync in FSEditLog#logEdit. (wang)
HDFS-8652. Track BlockInfo instead of Block in CorruptReplicasMap. (jing9)
OPTIMIZATIONS
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

View File

@ -179,7 +179,7 @@ public abstract class BlockInfo extends Block
* information indicating the index of the block in the
* corresponding block group.
*/
abstract boolean addStorage(DatanodeStorageInfo storage, Block reportedBlock);
abstract void addStorage(DatanodeStorageInfo storage, Block reportedBlock);
/**
* Remove {@link DatanodeStorageInfo} location for a block
@ -192,6 +192,11 @@ public abstract class BlockInfo extends Block
*/
abstract void replaceBlock(BlockInfo newBlock);
/**
* @return true if there is no storage storing the block
*/
abstract boolean hasEmptyStorage();
/**
* Find specified DatanodeStorageInfo.
* @return DatanodeStorageInfo or null if not found.

View File

@ -45,8 +45,8 @@ public class BlockInfoContiguous extends BlockInfo {
}
@Override
boolean addStorage(DatanodeStorageInfo storage, Block reportedBlock) {
return ContiguousBlockStorageOp.addStorage(this, storage);
void addStorage(DatanodeStorageInfo storage, Block reportedBlock) {
ContiguousBlockStorageOp.addStorage(this, storage);
}
@Override
@ -73,4 +73,9 @@ public class BlockInfoContiguous extends BlockInfo {
ucBlock.setBlockCollection(getBlockCollection());
return ucBlock;
}
@Override
boolean hasEmptyStorage() {
return ContiguousBlockStorageOp.hasEmptyStorage(this);
}
}

View File

@ -18,7 +18,6 @@
package org.apache.hadoop.hdfs.server.blockmanagement;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
@ -274,18 +273,17 @@ public abstract class BlockInfoUnderConstruction extends BlockInfo {
"No blocks found, lease removed.");
}
boolean allLiveReplicasTriedAsPrimary = true;
for (int i = 0; i < replicas.size(); i++) {
for (ReplicaUnderConstruction replica : replicas) {
// Check if all replicas have been tried or not.
if (replicas.get(i).isAlive()) {
allLiveReplicasTriedAsPrimary =
(allLiveReplicasTriedAsPrimary &&
replicas.get(i).getChosenAsPrimary());
if (replica.isAlive()) {
allLiveReplicasTriedAsPrimary = allLiveReplicasTriedAsPrimary
&& replica.getChosenAsPrimary();
}
}
if (allLiveReplicasTriedAsPrimary) {
// Just set all the replicas to be chosen whether they are alive or not.
for (int i = 0; i < replicas.size(); i++) {
replicas.get(i).setChosenAsPrimary(false);
for (ReplicaUnderConstruction replica : replicas) {
replica.setChosenAsPrimary(false);
}
}
long mostRecentLastUpdate = 0;
@ -345,10 +343,6 @@ public abstract class BlockInfoUnderConstruction extends BlockInfo {
* Convert an under construction block to a complete block.
*
* @return a complete block.
* @throws IOException
* if the state of the block (the generation stamp and the length)
* has not been committed by the client or it does not have at
* least a minimal number of replicas reported from data-nodes.
*/
public abstract BlockInfo convertToCompleteBlock();
@ -386,8 +380,8 @@ public abstract class BlockInfoUnderConstruction extends BlockInfo {
}
private void appendUCParts(StringBuilder sb) {
sb.append("{UCState=").append(blockUCState)
.append(", truncateBlock=" + truncateBlock)
sb.append("{UCState=").append(blockUCState).append(", truncateBlock=")
.append(truncateBlock)
.append(", primaryNodeIndex=").append(primaryNodeIndex)
.append(", replicas=[");
if (replicas != null) {

View File

@ -55,10 +55,6 @@ public class BlockInfoUnderConstructionContiguous extends
* Convert an under construction block to a complete block.
*
* @return BlockInfo - a complete block.
* @throws IOException if the state of the block
* (the generation stamp and the length) has not been committed by
* the client or it does not have at least a minimal number of replicas
* reported from data-nodes.
*/
@Override
public BlockInfoContiguous convertToCompleteBlock() {
@ -69,8 +65,8 @@ public class BlockInfoUnderConstructionContiguous extends
}
@Override
boolean addStorage(DatanodeStorageInfo storage, Block reportedBlock) {
return ContiguousBlockStorageOp.addStorage(this, storage);
void addStorage(DatanodeStorageInfo storage, Block reportedBlock) {
ContiguousBlockStorageOp.addStorage(this, storage);
}
@Override
@ -88,6 +84,11 @@ public class BlockInfoUnderConstructionContiguous extends
ContiguousBlockStorageOp.replaceBlock(this, newBlock);
}
@Override
boolean hasEmptyStorage() {
return ContiguousBlockStorageOp.hasEmptyStorage(this);
}
@Override
public void setExpectedLocations(DatanodeStorageInfo[] targets) {
int numLocations = targets == null ? 0 : targets.length;

View File

@ -17,7 +17,6 @@
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX;
import static org.apache.hadoop.util.ExitUtil.terminate;
import java.io.IOException;
@ -197,7 +196,7 @@ public class BlockManager implements BlockStatsMXBean {
* notified of all block deletions that might have been pending
* when the failover happened.
*/
private final Set<Block> postponedMisreplicatedBlocks = Sets.newHashSet();
private final Set<BlockInfo> postponedMisreplicatedBlocks = Sets.newHashSet();
/**
* Maps a StorageID to the set of blocks that are "extra" for this
@ -338,8 +337,7 @@ public class BlockManager implements BlockStatsMXBean {
DFSConfigKeys.DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_KEY,
DFSConfigKeys.DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_DEFAULT);
this.shouldCheckForEnoughRacks =
conf.get(DFSConfigKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY) == null
? false : true;
conf.get(DFSConfigKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY) != null;
this.blocksInvalidateWorkPct = DFSUtil.getInvalidateWorkPctPerIteration(conf);
this.blocksReplWorkMultiplier = DFSUtil.getReplWorkMultiplier(conf);
@ -465,8 +463,7 @@ public class BlockManager implements BlockStatsMXBean {
/** Should the access keys be updated? */
boolean shouldUpdateBlockKey(final long updateTime) throws IOException {
return isBlockTokenEnabled()? blockTokenSecretManager.updateKeys(updateTime)
: false;
return isBlockTokenEnabled() && blockTokenSecretManager.updateKeys(updateTime);
}
public void activate(Configuration conf) {
@ -519,14 +516,14 @@ public class BlockManager implements BlockStatsMXBean {
synchronized (neededReplications) {
out.println("Metasave: Blocks waiting for replication: " +
neededReplications.size());
for (Block block : neededReplications) {
for (BlockInfo block : neededReplications) {
dumpBlockMeta(block, out);
}
}
// Dump any postponed over-replicated blocks
out.println("Mis-replicated blocks that have been postponed:");
for (Block block : postponedMisreplicatedBlocks) {
for (BlockInfo block : postponedMisreplicatedBlocks) {
dumpBlockMeta(block, out);
}
@ -544,11 +541,9 @@ public class BlockManager implements BlockStatsMXBean {
* Dump the metadata for the given block in a human-readable
* form.
*/
private void dumpBlockMeta(Block block, PrintWriter out) {
List<DatanodeDescriptor> containingNodes =
new ArrayList<DatanodeDescriptor>();
List<DatanodeStorageInfo> containingLiveReplicasNodes =
new ArrayList<>();
private void dumpBlockMeta(BlockInfo block, PrintWriter out) {
List<DatanodeDescriptor> containingNodes = new ArrayList<>();
List<DatanodeStorageInfo> containingLiveReplicasNodes = new ArrayList<>();
NumberReplicas numReplicas = new NumberReplicas();
// source node returned is not used
@ -556,17 +551,16 @@ public class BlockManager implements BlockStatsMXBean {
containingLiveReplicasNodes, numReplicas,
UnderReplicatedBlocks.LEVEL);
// containingLiveReplicasNodes can include READ_ONLY_SHARED replicas which are
// not included in the numReplicas.liveReplicas() count
// containingLiveReplicasNodes can include READ_ONLY_SHARED replicas which
// are not included in the numReplicas.liveReplicas() count
assert containingLiveReplicasNodes.size() >= numReplicas.liveReplicas();
int usableReplicas = numReplicas.liveReplicas() +
numReplicas.decommissionedAndDecommissioning();
if (block instanceof BlockInfo) {
BlockCollection bc = ((BlockInfo) block).getBlockCollection();
BlockCollection bc = block.getBlockCollection();
String fileName = (bc == null) ? "[orphaned]" : bc.getName();
out.print(fileName + ": ");
}
// l: == live:, d: == decommissioned c: == corrupt e: == excess
out.print(block + ((usableReplicas > 0)? "" : " MISSING") +
" (replicas:" +
@ -813,7 +807,8 @@ public class BlockManager implements BlockStatsMXBean {
final long offset, final long length, final int nrBlocksToReturn,
final AccessMode mode) throws IOException {
int curBlk;
long curPos = 0, blkSize = 0;
long curPos = 0;
long blkSize;
int nrBlocks = (blocks[0].getNumBytes() == 0) ? 0 : blocks.length;
for (curBlk = 0; curBlk < nrBlocks; curBlk++) {
blkSize = blocks[curBlk].getNumBytes();
@ -1204,10 +1199,11 @@ public class BlockManager implements BlockStatsMXBean {
}
/**
*
* @param b
* Mark a replica (of a contiguous block) or an internal block (of a striped
* block group) as corrupt.
* @param b Indicating the reported bad block and the corresponding BlockInfo
* stored in blocksMap.
* @param storageInfo storage that contains the block, if known. null otherwise.
* @throws IOException
*/
private void markBlockAsCorrupt(BlockToMarkCorrupt b,
DatanodeStorageInfo storageInfo,
@ -1228,7 +1224,7 @@ public class BlockManager implements BlockStatsMXBean {
}
// Add this replica to corruptReplicas Map
corruptReplicas.addToCorruptReplicasMap(b.corrupted, node, b.reason,
corruptReplicas.addToCorruptReplicasMap(b.stored, node, b.reason,
b.reasonCode);
NumberReplicas numberOfReplicas = countNodes(b.stored);
@ -1250,7 +1246,7 @@ public class BlockManager implements BlockStatsMXBean {
if (hasEnoughLiveReplicas || hasMoreCorruptReplicas
|| corruptedDuringWrite) {
// the block is over-replicated so invalidate the replicas immediately
invalidateBlock(b, node);
invalidateBlock(b, node, numberOfReplicas);
} else if (namesystem.isPopulatingReplQueues()) {
// add the block to neededReplication
updateNeededReplications(b.stored, -1, 0);
@ -1258,12 +1254,15 @@ public class BlockManager implements BlockStatsMXBean {
}
/**
* Invalidates the given block on the given datanode.
* @return true if the block was successfully invalidated and no longer
* present in the BlocksMap
* Invalidates the given block on the given datanode. Note that before this
* call we have already checked the current live replicas of the block and
* make sure it's safe to invalidate the replica.
*
* @return true if the replica was successfully invalidated and no longer
* associated with the DataNode.
*/
private boolean invalidateBlock(BlockToMarkCorrupt b, DatanodeInfo dn
) throws IOException {
private boolean invalidateBlock(BlockToMarkCorrupt b, DatanodeInfo dn,
NumberReplicas nr) throws IOException {
blockLog.info("BLOCK* invalidateBlock: {} on {}", b, dn);
DatanodeDescriptor node = getDatanodeManager().getDatanode(dn);
if (node == null) {
@ -1272,35 +1271,30 @@ public class BlockManager implements BlockStatsMXBean {
}
// Check how many copies we have of the block
NumberReplicas nr = countNodes(b.stored);
if (nr.replicasOnStaleNodes() > 0) {
blockLog.info("BLOCK* invalidateBlocks: postponing " +
"invalidation of {} on {} because {} replica(s) are located on " +
"nodes with potentially out-of-date block reports", b, dn,
nr.replicasOnStaleNodes());
postponeBlock(b.corrupted);
postponeBlock(b.stored);
return false;
} else if (nr.liveReplicas() >= 1) {
// If we have at least one copy on a live node, then we can delete it.
} else {
// we already checked the number of replicas in the caller of this
// function and we know there is at least one copy on a live node, so we
// can delete it.
addToInvalidates(b.corrupted, dn);
removeStoredBlock(b.stored, node);
blockLog.debug("BLOCK* invalidateBlocks: {} on {} listed for deletion.",
b, dn);
return true;
} else {
blockLog.info("BLOCK* invalidateBlocks: {} on {} is the only copy and" +
" was not deleted", b, dn);
return false;
}
}
public void setPostponeBlocksFromFuture(boolean postpone) {
this.shouldPostponeBlocksFromFuture = postpone;
}
private void postponeBlock(Block blk) {
private void postponeBlock(BlockInfo blk) {
if (postponedMisreplicatedBlocks.add(blk)) {
postponedMisreplicatedBlocksCount.incrementAndGet();
}
@ -1374,7 +1368,7 @@ public class BlockManager implements BlockStatsMXBean {
int requiredReplication, numEffectiveReplicas;
List<DatanodeDescriptor> containingNodes;
DatanodeDescriptor srcNode;
BlockCollection bc = null;
BlockCollection bc;
int additionalReplRequired;
int scheduledWork = 0;
@ -1535,9 +1529,9 @@ public class BlockManager implements BlockStatsMXBean {
DatanodeStorageInfo[] targets = rw.targets;
if (targets != null && targets.length != 0) {
StringBuilder targetList = new StringBuilder("datanode(s)");
for (int k = 0; k < targets.length; k++) {
for (DatanodeStorageInfo target : targets) {
targetList.append(' ');
targetList.append(targets[k].getDatanodeDescriptor());
targetList.append(target.getDatanodeDescriptor());
}
blockLog.info("BLOCK* ask {} to replicate {} to {}", rw.srcNode,
rw.block, targetList);
@ -1614,8 +1608,8 @@ public class BlockManager implements BlockStatsMXBean {
List<DatanodeDescriptor> datanodeDescriptors = null;
if (nodes != null) {
datanodeDescriptors = new ArrayList<>(nodes.size());
for (int i = 0; i < nodes.size(); i++) {
DatanodeDescriptor node = datanodeManager.getDatanodeDescriptor(nodes.get(i));
for (String nodeStr : nodes) {
DatanodeDescriptor node = datanodeManager.getDatanodeDescriptor(nodeStr);
if (node != null) {
datanodeDescriptors.add(node);
}
@ -1654,7 +1648,7 @@ public class BlockManager implements BlockStatsMXBean {
* the given block
*/
@VisibleForTesting
DatanodeDescriptor chooseSourceDatanode(Block block,
DatanodeDescriptor chooseSourceDatanode(BlockInfo block,
List<DatanodeDescriptor> containingNodes,
List<DatanodeStorageInfo> nodesContainingLiveReplicas,
NumberReplicas numReplicas,
@ -1734,16 +1728,16 @@ public class BlockManager implements BlockStatsMXBean {
if (timedOutItems != null) {
namesystem.writeLock();
try {
for (int i = 0; i < timedOutItems.length; i++) {
for (BlockInfo timedOutItem : timedOutItems) {
/*
* Use the blockinfo from the blocksmap to be certain we're working
* with the most up-to-date block information (e.g. genstamp).
*/
BlockInfo bi = getStoredBlock(timedOutItems[i]);
BlockInfo bi = getStoredBlock(timedOutItem);
if (bi == null) {
continue;
}
NumberReplicas num = countNodes(timedOutItems[i]);
NumberReplicas num = countNodes(timedOutItem);
if (isNeededReplication(bi, getReplication(bi), num.liveReplicas())) {
neededReplications.add(bi, num.liveReplicas(),
num.decommissionedAndDecommissioning(), getReplication(bi));
@ -1760,7 +1754,7 @@ public class BlockManager implements BlockStatsMXBean {
public long requestBlockReportLeaseId(DatanodeRegistration nodeReg) {
assert namesystem.hasReadLock();
DatanodeDescriptor node = null;
DatanodeDescriptor node;
try {
node = datanodeManager.getDatanode(nodeReg);
} catch (UnregisteredNodeException e) {
@ -2022,7 +2016,7 @@ public class BlockManager implements BlockStatsMXBean {
startIndex += (base+1);
}
}
Iterator<Block> it = postponedMisreplicatedBlocks.iterator();
Iterator<BlockInfo> it = postponedMisreplicatedBlocks.iterator();
for (int tmp = 0; tmp < startIndex; tmp++) {
it.next();
}
@ -2117,7 +2111,7 @@ public class BlockManager implements BlockStatsMXBean {
long oldGenerationStamp, long oldNumBytes,
DatanodeStorageInfo[] newStorages) throws IOException {
assert namesystem.hasWriteLock();
BlockToMarkCorrupt b = null;
BlockToMarkCorrupt b;
if (block.getGenerationStamp() != oldGenerationStamp) {
b = new BlockToMarkCorrupt(oldBlock, block, oldGenerationStamp,
"genstamp does not match " + oldGenerationStamp
@ -2719,7 +2713,7 @@ public class BlockManager implements BlockStatsMXBean {
" but corrupt replicas map has " + corruptReplicasCount);
}
if ((corruptReplicasCount > 0) && (numLiveReplicas >= fileReplication)) {
invalidateCorruptReplicas(storedBlock, reportedBlock);
invalidateCorruptReplicas(storedBlock, reportedBlock, num);
}
return storedBlock;
}
@ -2752,18 +2746,20 @@ public class BlockManager implements BlockStatsMXBean {
*
* @param blk Block whose corrupt replicas need to be invalidated
*/
private void invalidateCorruptReplicas(BlockInfo blk, Block reported) {
private void invalidateCorruptReplicas(BlockInfo blk, Block reported,
NumberReplicas numberReplicas) {
Collection<DatanodeDescriptor> nodes = corruptReplicas.getNodes(blk);
boolean removedFromBlocksMap = true;
if (nodes == null)
return;
// make a copy of the array of nodes in order to avoid
// ConcurrentModificationException, when the block is removed from the node
DatanodeDescriptor[] nodesCopy = nodes.toArray(new DatanodeDescriptor[0]);
DatanodeDescriptor[] nodesCopy = nodes.toArray(
new DatanodeDescriptor[nodes.size()]);
for (DatanodeDescriptor node : nodesCopy) {
try {
if (!invalidateBlock(new BlockToMarkCorrupt(reported, blk, null,
Reason.ANY), node)) {
Reason.ANY), node, numberReplicas)) {
removedFromBlocksMap = false;
}
} catch (IOException e) {
@ -2813,7 +2809,6 @@ public class BlockManager implements BlockStatsMXBean {
replicationQueuesInitializer.join();
} catch (final InterruptedException e) {
LOG.warn("Interrupted while waiting for replicationQueueInitializer. Returning..");
return;
} finally {
replicationQueuesInitializer = null;
}
@ -3175,8 +3170,7 @@ public class BlockManager implements BlockStatsMXBean {
CachedBlock cblock = namesystem.getCacheManager().getCachedBlocks()
.get(new CachedBlock(storedBlock.getBlockId(), (short) 0, false));
if (cblock != null) {
boolean removed = false;
removed |= node.getPendingCached().remove(cblock);
boolean removed = node.getPendingCached().remove(cblock);
removed |= node.getCached().remove(cblock);
removed |= node.getPendingUncached().remove(cblock);
if (removed) {
@ -3392,7 +3386,7 @@ public class BlockManager implements BlockStatsMXBean {
int excess = 0;
int stale = 0;
Collection<DatanodeDescriptor> nodesCorrupt = corruptReplicas.getNodes(b);
for(DatanodeStorageInfo storage : blocksMap.getStorages(b, State.NORMAL)) {
for (DatanodeStorageInfo storage : blocksMap.getStorages(b, State.NORMAL)) {
final DatanodeDescriptor node = storage.getDatanodeDescriptor();
if ((nodesCorrupt != null) && (nodesCorrupt.contains(node))) {
corrupt++;
@ -3413,7 +3407,8 @@ public class BlockManager implements BlockStatsMXBean {
stale++;
}
}
return new NumberReplicas(live, decommissioned, decommissioning, corrupt, excess, stale);
return new NumberReplicas(live, decommissioned, decommissioning, corrupt,
excess, stale);
}
/**
@ -3596,8 +3591,6 @@ public class BlockManager implements BlockStatsMXBean {
String src, BlockInfo[] blocks) {
for (BlockInfo b: blocks) {
if (!b.isComplete()) {
final BlockInfoUnderConstruction uc =
(BlockInfoUnderConstruction)b;
final int numNodes = b.numNodes();
final int min = getMinStorageNum(b);
final BlockUCState state = b.getBlockUCState();
@ -3723,11 +3716,7 @@ public class BlockManager implements BlockStatsMXBean {
return blocksMap.getBlockCollection(b);
}
public int numCorruptReplicas(Block block) {
return corruptReplicas.numCorruptReplicas(block);
}
public void removeBlockFromMap(Block block) {
public void removeBlockFromMap(BlockInfo block) {
removeFromExcessReplicateMap(block);
blocksMap.removeBlock(block);
// If block is removed from blocksMap remove it from corruptReplicasMap
@ -3737,7 +3726,7 @@ public class BlockManager implements BlockStatsMXBean {
/**
* If a block is removed from blocksMap, remove it from excessReplicateMap.
*/
private void removeFromExcessReplicateMap(Block block) {
private void removeFromExcessReplicateMap(BlockInfo block) {
for (DatanodeStorageInfo info : getStorages(block)) {
String uuid = info.getDatanodeDescriptor().getDatanodeUuid();
LightWeightLinkedSet<BlockInfo> excessReplicas =
@ -3768,14 +3757,14 @@ public class BlockManager implements BlockStatsMXBean {
/**
* Get the replicas which are corrupt for a given block.
*/
public Collection<DatanodeDescriptor> getCorruptReplicas(Block block) {
public Collection<DatanodeDescriptor> getCorruptReplicas(BlockInfo block) {
return corruptReplicas.getNodes(block);
}
/**
* Get reason for certain corrupted replicas for a given block and a given dn.
*/
public String getCorruptReason(Block block, DatanodeDescriptor node) {
public String getCorruptReason(BlockInfo block, DatanodeDescriptor node) {
return corruptReplicas.getCorruptReason(block, node);
}
@ -3869,7 +3858,7 @@ public class BlockManager implements BlockStatsMXBean {
datanodeManager.clearPendingQueues();
postponedMisreplicatedBlocks.clear();
postponedMisreplicatedBlocksCount.set(0);
};
}
public static LocatedBlock newLocatedBlock(
ExtendedBlock b, DatanodeStorageInfo[] storages,

View File

@ -117,7 +117,7 @@ class BlocksMap {
* remove it from all data-node lists it belongs to;
* and remove all data-node locations associated with the block.
*/
void removeBlock(Block block) {
void removeBlock(BlockInfo block) {
BlockInfo blockInfo = blocks.remove(block);
if (blockInfo == null)
return;
@ -190,7 +190,7 @@ class BlocksMap {
// remove block from the data-node list and the node from the block info
boolean removed = node.removeBlock(info);
if (info.getDatanode(0) == null // no datanodes left
if (info.hasEmptyStorage() // no datanodes left
&& info.isDeleted()) { // does not belong to a file
blocks.remove(b); // remove block from the map
}

View File

@ -45,13 +45,12 @@ class ContiguousBlockStorageOp {
return last;
}
static boolean addStorage(BlockInfo b, DatanodeStorageInfo storage) {
static void addStorage(BlockInfo b, DatanodeStorageInfo storage) {
// find the last null node
int lastNode = ensureCapacity(b, 1);
b.setStorageInfo(lastNode, storage);
b.setNext(lastNode, null);
b.setPrevious(lastNode, null);
return true;
}
static boolean removeStorage(BlockInfo b,
@ -103,4 +102,8 @@ class ContiguousBlockStorageOp {
"newBlock already exists.");
}
}
static boolean hasEmptyStorage(BlockInfo b) {
return b.getStorageInfo(0) == null;
}
}

View File

@ -17,8 +17,8 @@
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.ipc.Server;
@ -46,8 +46,12 @@ public class CorruptReplicasMap{
CORRUPTION_REPORTED // client or datanode reported the corruption
}
private final SortedMap<Block, Map<DatanodeDescriptor, Reason>> corruptReplicasMap =
new TreeMap<Block, Map<DatanodeDescriptor, Reason>>();
/**
* Used to track corrupted replicas (for contiguous block) or internal blocks
* (for striped block) and the corresponding DataNodes. For a striped block,
* the key here is the striped block group object stored in the blocksMap.
*/
private final SortedMap<BlockInfo, Map<DatanodeDescriptor, Reason>> corruptReplicasMap = new TreeMap<>();
/**
* Mark the block belonging to datanode as corrupt.
@ -57,11 +61,11 @@ public class CorruptReplicasMap{
* @param reason a textual reason (for logging purposes)
* @param reasonCode the enum representation of the reason
*/
void addToCorruptReplicasMap(Block blk, DatanodeDescriptor dn,
void addToCorruptReplicasMap(BlockInfo blk, DatanodeDescriptor dn,
String reason, Reason reasonCode) {
Map <DatanodeDescriptor, Reason> nodes = corruptReplicasMap.get(blk);
if (nodes == null) {
nodes = new HashMap<DatanodeDescriptor, Reason>();
nodes = new HashMap<>();
corruptReplicasMap.put(blk, nodes);
}
@ -92,7 +96,7 @@ public class CorruptReplicasMap{
*
* @param blk Block to be removed
*/
void removeFromCorruptReplicasMap(Block blk) {
void removeFromCorruptReplicasMap(BlockInfo blk) {
if (corruptReplicasMap != null) {
corruptReplicasMap.remove(blk);
}
@ -105,12 +109,13 @@ public class CorruptReplicasMap{
* @return true if the removal is successful;
false if the replica is not in the map
*/
boolean removeFromCorruptReplicasMap(Block blk, DatanodeDescriptor datanode) {
boolean removeFromCorruptReplicasMap(BlockInfo blk,
DatanodeDescriptor datanode) {
return removeFromCorruptReplicasMap(blk, datanode, Reason.ANY);
}
boolean removeFromCorruptReplicasMap(Block blk, DatanodeDescriptor datanode,
Reason reason) {
boolean removeFromCorruptReplicasMap(BlockInfo blk,
DatanodeDescriptor datanode, Reason reason) {
Map <DatanodeDescriptor, Reason> datanodes = corruptReplicasMap.get(blk);
if (datanodes==null)
return false;
@ -139,11 +144,9 @@ public class CorruptReplicasMap{
* @param blk Block for which nodes are requested
* @return collection of nodes. Null if does not exists
*/
Collection<DatanodeDescriptor> getNodes(Block blk) {
Map <DatanodeDescriptor, Reason> nodes = corruptReplicasMap.get(blk);
if (nodes == null)
return null;
return nodes.keySet();
Collection<DatanodeDescriptor> getNodes(BlockInfo blk) {
Map<DatanodeDescriptor, Reason> nodes = corruptReplicasMap.get(blk);
return nodes != null ? nodes.keySet() : null;
}
/**
@ -153,12 +156,12 @@ public class CorruptReplicasMap{
* @param node DatanodeDescriptor which holds the replica
* @return true if replica is corrupt, false if does not exists in this map
*/
boolean isReplicaCorrupt(Block blk, DatanodeDescriptor node) {
boolean isReplicaCorrupt(BlockInfo blk, DatanodeDescriptor node) {
Collection<DatanodeDescriptor> nodes = getNodes(blk);
return ((nodes != null) && (nodes.contains(node)));
}
int numCorruptReplicas(Block blk) {
int numCorruptReplicas(BlockInfo blk) {
Collection<DatanodeDescriptor> nodes = getNodes(blk);
return (nodes == null) ? 0 : nodes.size();
}
@ -181,44 +184,39 @@ public class CorruptReplicasMap{
* @return Up to numExpectedBlocks blocks from startingBlockId if it exists
*
*/
@VisibleForTesting
long[] getCorruptReplicaBlockIds(int numExpectedBlocks,
Long startingBlockId) {
if (numExpectedBlocks < 0 || numExpectedBlocks > 100) {
return null;
}
Iterator<Block> blockIt = corruptReplicasMap.keySet().iterator();
Iterator<BlockInfo> blockIt = corruptReplicasMap.keySet().iterator();
// if the starting block id was specified, iterate over keys until
// we find the matching block. If we find a matching block, break
// to leave the iterator on the next block after the specified block.
if (startingBlockId != null) {
boolean isBlockFound = false;
while (blockIt.hasNext()) {
Block b = blockIt.next();
BlockInfo b = blockIt.next();
if (b.getBlockId() == startingBlockId) {
isBlockFound = true;
break;
}
}
if (!isBlockFound) {
return null;
}
}
ArrayList<Long> corruptReplicaBlockIds = new ArrayList<Long>();
ArrayList<Long> corruptReplicaBlockIds = new ArrayList<>();
// append up to numExpectedBlocks blockIds to our list
for(int i=0; i<numExpectedBlocks && blockIt.hasNext(); i++) {
corruptReplicaBlockIds.add(blockIt.next().getBlockId());
}
long[] ret = new long[corruptReplicaBlockIds.size()];
for(int i=0; i<ret.length; i++) {
ret[i] = corruptReplicaBlockIds.get(i);
}
return ret;
}
@ -229,7 +227,7 @@ public class CorruptReplicasMap{
* @param node datanode that contains this corrupted replica
* @return reason
*/
String getCorruptReason(Block block, DatanodeDescriptor node) {
String getCorruptReason(BlockInfo block, DatanodeDescriptor node) {
Reason reason = null;
if(corruptReplicasMap.containsKey(block)) {
if (corruptReplicasMap.get(block).containsKey(node)) {

View File

@ -71,7 +71,7 @@ class FSDirWriteFileOp {
private FSDirWriteFileOp() {}
static boolean unprotectedRemoveBlock(
FSDirectory fsd, String path, INodesInPath iip, INodeFile fileNode,
Block block) throws IOException {
BlockInfo block) throws IOException {
// modify file-> block and blocksMap
// fileNode should be under construction
BlockInfoUnderConstruction uc = fileNode.removeLastBlock(block);
@ -136,7 +136,9 @@ class FSDirWriteFileOp {
fsd.writeLock();
try {
// Remove the block from the pending creates list
if (!unprotectedRemoveBlock(fsd, src, iip, file, localBlock)) {
BlockInfo storedBlock = fsd.getBlockManager().getStoredBlock(localBlock);
if (storedBlock != null &&
!unprotectedRemoveBlock(fsd, src, iip, file, storedBlock)) {
return;
}
} finally {

View File

@ -1035,7 +1035,7 @@ public class FSEditLogLoader {
throw new IOException("Trying to remove more than one block from file "
+ path);
}
Block oldBlock = oldBlocks[oldBlocks.length - 1];
BlockInfo oldBlock = oldBlocks[oldBlocks.length - 1];
boolean removed = FSDirWriteFileOp.unprotectedRemoveBlock(
fsDir, path, iip, file, oldBlock);
if (!removed && !(op instanceof UpdateBlocksOp)) {

View File

@ -267,10 +267,8 @@ public class NamenodeFsck implements DataEncryptionKeyFactory {
out.println("No. of corrupted Replica: " +
numberReplicas.corruptReplicas());
//record datanodes that have corrupted block replica
Collection<DatanodeDescriptor> corruptionRecord = null;
if (bm.getCorruptReplicas(block) != null) {
corruptionRecord = bm.getCorruptReplicas(block);
}
Collection<DatanodeDescriptor> corruptionRecord =
bm.getCorruptReplicas(blockInfo);
//report block replicas status on datanodes
for(int idx = (blockInfo.numNodes()-1); idx >= 0; idx--) {
@ -279,7 +277,7 @@ public class NamenodeFsck implements DataEncryptionKeyFactory {
dn.getNetworkLocation() + " ");
if (corruptionRecord != null && corruptionRecord.contains(dn)) {
out.print(CORRUPT_STATUS+"\t ReasonCode: "+
bm.getCorruptReason(block,dn));
bm.getCorruptReason(blockInfo, dn));
} else if (dn.isDecommissioned() ){
out.print(DECOMMISSIONED_STATUS);
} else if (dn.isDecommissionInProgress()) {
@ -650,7 +648,7 @@ public class NamenodeFsck implements DataEncryptionKeyFactory {
LightWeightLinkedSet<BlockInfo> blocksExcess =
bm.excessReplicateMap.get(dnDesc.getDatanodeUuid());
Collection<DatanodeDescriptor> corruptReplicas =
bm.getCorruptReplicas(block.getLocalBlock());
bm.getCorruptReplicas(storedBlock);
sb.append("(");
if (dnDesc.isDecommissioned()) {
sb.append("DECOMMISSIONED)");
@ -658,7 +656,7 @@ public class NamenodeFsck implements DataEncryptionKeyFactory {
sb.append("DECOMMISSIONING)");
} else if (corruptReplicas != null && corruptReplicas.contains(dnDesc)) {
sb.append("CORRUPT)");
} else if (blocksExcess != null && blocksExcess.contains(block.getLocalBlock())) {
} else if (blocksExcess != null && blocksExcess.contains(storedBlock)) {
sb.append("EXCESS)");
} else if (dnDesc.isStale(this.staleInterval)) {
sb.append("STALE_NODE)");

View File

@ -560,7 +560,8 @@ public class DFSTestUtil {
throws TimeoutException, InterruptedException {
int count = 0;
final int ATTEMPTS = 50;
int repls = ns.getBlockManager().numCorruptReplicas(b.getLocalBlock());
int repls = BlockManagerTestUtil.numCorruptReplicas(ns.getBlockManager(),
b.getLocalBlock());
while (repls != corruptRepls && count < ATTEMPTS) {
try {
IOUtils.copyBytes(fs.open(file), new IOUtils.NullOutputStream(),
@ -572,7 +573,8 @@ public class DFSTestUtil {
count++;
// check more often so corrupt block reports are not easily missed
for (int i = 0; i < 10; i++) {
repls = ns.getBlockManager().numCorruptReplicas(b.getLocalBlock());
repls = BlockManagerTestUtil.numCorruptReplicas(ns.getBlockManager(),
b.getLocalBlock());
Thread.sleep(100);
if (repls == corruptRepls) {
break;

View File

@ -24,6 +24,7 @@ import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.protocol.Block;
@ -87,7 +88,7 @@ public class BlockManagerTestUtil {
final Block b) {
final Set<String> rackSet = new HashSet<String>(0);
final Collection<DatanodeDescriptor> corruptNodes =
getCorruptReplicas(blockManager).getNodes(b);
getCorruptReplicas(blockManager).getNodes(blockManager.getStoredBlock(b));
for(DatanodeStorageInfo storage : blockManager.blocksMap.getStorages(b)) {
final DatanodeDescriptor cur = storage.getDatanodeDescriptor();
if (!cur.isDecommissionInProgress() && !cur.isDecommissioned()) {
@ -306,4 +307,8 @@ public class BlockManagerTestUtil {
throws ExecutionException, InterruptedException {
dm.getDecomManager().runMonitor();
}
public static int numCorruptReplicas(BlockManager bm, Block block) {
return bm.corruptReplicas.numCorruptReplicas(bm.getStoredBlock(block));
}
}

View File

@ -63,9 +63,7 @@ public class TestBlockInfo {
final DatanodeStorageInfo storage = DFSTestUtil.createDatanodeStorageInfo("storageID", "127.0.0.1");
boolean added = blockInfo.addStorage(storage, blockInfo);
Assert.assertTrue(added);
blockInfo.addStorage(storage, blockInfo);
Assert.assertEquals(storage, blockInfo.getStorageInfo(0));
}
@ -73,7 +71,7 @@ public class TestBlockInfo {
public void testCopyConstructor() {
BlockInfo old = new BlockInfoContiguous((short) 3);
try {
BlockInfo copy = new BlockInfoContiguous((BlockInfoContiguous)old);
BlockInfo copy = new BlockInfoContiguous(old);
assertEquals(old.getBlockCollection(), copy.getBlockCollection());
assertEquals(old.getCapacity(), copy.getCapacity());
} catch (Exception e) {
@ -110,8 +108,8 @@ public class TestBlockInfo {
final int MAX_BLOCKS = 10;
DatanodeStorageInfo dd = DFSTestUtil.createDatanodeStorageInfo("s1", "1.1.1.1");
ArrayList<Block> blockList = new ArrayList<Block>(MAX_BLOCKS);
ArrayList<BlockInfo> blockInfoList = new ArrayList<BlockInfo>();
ArrayList<Block> blockList = new ArrayList<>(MAX_BLOCKS);
ArrayList<BlockInfo> blockInfoList = new ArrayList<>();
int headIndex;
int curIndex;

View File

@ -509,7 +509,7 @@ public class TestBlockManager {
+ " even if all available source nodes have reached their replication"
+ " limits below the hard limit.",
bm.chooseSourceDatanode(
aBlock,
bm.getStoredBlock(aBlock),
cntNodes,
liveNodes,
new NumberReplicas(),
@ -519,7 +519,7 @@ public class TestBlockManager {
+ " replication since all available source nodes have reached"
+ " their replication limits.",
bm.chooseSourceDatanode(
aBlock,
bm.getStoredBlock(aBlock),
cntNodes,
liveNodes,
new NumberReplicas(),
@ -532,7 +532,7 @@ public class TestBlockManager {
assertNull("Does not choose a source node for a highest-priority"
+ " replication when all available nodes exceed the hard limit.",
bm.chooseSourceDatanode(
aBlock,
bm.getStoredBlock(aBlock),
cntNodes,
liveNodes,
new NumberReplicas(),
@ -558,7 +558,7 @@ public class TestBlockManager {
+ " if all available source nodes have reached their replication"
+ " limits below the hard limit.",
bm.chooseSourceDatanode(
aBlock,
bm.getStoredBlock(aBlock),
cntNodes,
liveNodes,
new NumberReplicas(),
@ -572,7 +572,7 @@ public class TestBlockManager {
assertNull("Does not choose a source decommissioning node for a normal"
+ " replication when all available nodes exceed the hard limit.",
bm.chooseSourceDatanode(
aBlock,
bm.getStoredBlock(aBlock),
cntNodes,
liveNodes,
new NumberReplicas(),

View File

@ -48,20 +48,19 @@ public class TestCorruptReplicaInfo {
private static final Log LOG =
LogFactory.getLog(TestCorruptReplicaInfo.class);
private final Map<Long, Block> block_map =
new HashMap<Long, Block>();
private final Map<Long, BlockInfo> block_map = new HashMap<>();
// Allow easy block creation by block id
// Return existing block if one with same block id already exists
private Block getBlock(Long block_id) {
private BlockInfo getBlock(Long block_id) {
if (!block_map.containsKey(block_id)) {
block_map.put(block_id, new Block(block_id,0,0));
block_map.put(block_id,
new BlockInfoContiguous(new Block(block_id, 0, 0), (short) 1));
}
return block_map.get(block_id);
}
private Block getBlock(int block_id) {
private BlockInfo getBlock(int block_id) {
return getBlock((long)block_id);
}
@ -82,7 +81,7 @@ public class TestCorruptReplicaInfo {
// create a list of block_ids. A list is used to allow easy validation of the
// output of getCorruptReplicaBlockIds
int NUM_BLOCK_IDS = 140;
List<Long> block_ids = new LinkedList<Long>();
List<Long> block_ids = new LinkedList<>();
for (int i=0;i<NUM_BLOCK_IDS;i++) {
block_ids.add((long)i);
}
@ -130,7 +129,7 @@ public class TestCorruptReplicaInfo {
}
private static void addToCorruptReplicasMap(CorruptReplicasMap crm,
Block blk, DatanodeDescriptor dn) {
BlockInfo blk, DatanodeDescriptor dn) {
crm.addToCorruptReplicasMap(blk, dn, "TEST", Reason.NONE);
}
}