Revert "HDFS-8652. Track BlockInfo instead of Block in CorruptReplicasMap. Contributed by Jing Zhao."

This reverts commit d62b63d297bff12d93de560dd50ddd48743b851d.

(cherry picked from commit bc99aaffe7b0ed13b1efc37b6a32cdbd344c2d75)
This commit is contained in:
Jing Zhao 2015-07-07 10:08:30 -07:00
parent 62424b5d99
commit bed74b620a
17 changed files with 168 additions and 169 deletions

View File

@ -362,8 +362,6 @@ Release 2.8.0 - UNRELEASED
HDFS-8709. Clarify automatic sync in FSEditLog#logEdit. (wang) HDFS-8709. Clarify automatic sync in FSEditLog#logEdit. (wang)
HDFS-8652. Track BlockInfo instead of Block in CorruptReplicasMap. (jing9)
OPTIMIZATIONS OPTIMIZATIONS
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

View File

@ -182,7 +182,7 @@ public int getCapacity() {
* information indicating the index of the block in the * information indicating the index of the block in the
* corresponding block group. * corresponding block group.
*/ */
abstract void addStorage(DatanodeStorageInfo storage, Block reportedBlock); abstract boolean addStorage(DatanodeStorageInfo storage, Block reportedBlock);
/** /**
* Remove {@link DatanodeStorageInfo} location for a block * Remove {@link DatanodeStorageInfo} location for a block
@ -195,11 +195,6 @@ public int getCapacity() {
*/ */
abstract void replaceBlock(BlockInfo newBlock); abstract void replaceBlock(BlockInfo newBlock);
/**
* @return true if there is no storage storing the block
*/
abstract boolean hasEmptyStorage();
/** /**
* Find specified DatanodeStorageInfo. * Find specified DatanodeStorageInfo.
* @return DatanodeStorageInfo or null if not found. * @return DatanodeStorageInfo or null if not found.

View File

@ -45,8 +45,8 @@ protected BlockInfoContiguous(BlockInfo from) {
} }
@Override @Override
void addStorage(DatanodeStorageInfo storage, Block reportedBlock) { boolean addStorage(DatanodeStorageInfo storage, Block reportedBlock) {
ContiguousBlockStorageOp.addStorage(this, storage); return ContiguousBlockStorageOp.addStorage(this, storage);
} }
@Override @Override
@ -73,9 +73,4 @@ BlockInfoUnderConstruction convertCompleteBlockToUC(
ucBlock.setBlockCollection(getBlockCollection()); ucBlock.setBlockCollection(getBlockCollection());
return ucBlock; return ucBlock;
} }
@Override
boolean hasEmptyStorage() {
return ContiguousBlockStorageOp.hasEmptyStorage(this);
}
} }

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.server.blockmanagement; package org.apache.hadoop.hdfs.server.blockmanagement;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
@ -273,17 +274,18 @@ public void initializeBlockRecovery(long recoveryId) {
"No blocks found, lease removed."); "No blocks found, lease removed.");
} }
boolean allLiveReplicasTriedAsPrimary = true; boolean allLiveReplicasTriedAsPrimary = true;
for (ReplicaUnderConstruction replica : replicas) { for (int i = 0; i < replicas.size(); i++) {
// Check if all replicas have been tried or not. // Check if all replicas have been tried or not.
if (replica.isAlive()) { if (replicas.get(i).isAlive()) {
allLiveReplicasTriedAsPrimary = allLiveReplicasTriedAsPrimary allLiveReplicasTriedAsPrimary =
&& replica.getChosenAsPrimary(); (allLiveReplicasTriedAsPrimary &&
replicas.get(i).getChosenAsPrimary());
} }
} }
if (allLiveReplicasTriedAsPrimary) { if (allLiveReplicasTriedAsPrimary) {
// Just set all the replicas to be chosen whether they are alive or not. // Just set all the replicas to be chosen whether they are alive or not.
for (ReplicaUnderConstruction replica : replicas) { for (int i = 0; i < replicas.size(); i++) {
replica.setChosenAsPrimary(false); replicas.get(i).setChosenAsPrimary(false);
} }
} }
long mostRecentLastUpdate = 0; long mostRecentLastUpdate = 0;
@ -343,6 +345,10 @@ void addReplicaIfNotPresent(DatanodeStorageInfo storage,
* Convert an under construction block to a complete block. * Convert an under construction block to a complete block.
* *
* @return a complete block. * @return a complete block.
* @throws IOException
* if the state of the block (the generation stamp and the length)
* has not been committed by the client or it does not have at
* least a minimal number of replicas reported from data-nodes.
*/ */
public abstract BlockInfo convertToCompleteBlock(); public abstract BlockInfo convertToCompleteBlock();
@ -380,8 +386,8 @@ public void appendStringTo(StringBuilder sb) {
} }
private void appendUCParts(StringBuilder sb) { private void appendUCParts(StringBuilder sb) {
sb.append("{UCState=").append(blockUCState).append(", truncateBlock=") sb.append("{UCState=").append(blockUCState)
.append(truncateBlock) .append(", truncateBlock=" + truncateBlock)
.append(", primaryNodeIndex=").append(primaryNodeIndex) .append(", primaryNodeIndex=").append(primaryNodeIndex)
.append(", replicas=["); .append(", replicas=[");
if (replicas != null) { if (replicas != null) {

View File

@ -55,6 +55,10 @@ public BlockInfoUnderConstructionContiguous(Block blk, short replication,
* Convert an under construction block to a complete block. * Convert an under construction block to a complete block.
* *
* @return BlockInfo - a complete block. * @return BlockInfo - a complete block.
* @throws IOException if the state of the block
* (the generation stamp and the length) has not been committed by
* the client or it does not have at least a minimal number of replicas
* reported from data-nodes.
*/ */
@Override @Override
public BlockInfoContiguous convertToCompleteBlock() { public BlockInfoContiguous convertToCompleteBlock() {
@ -65,8 +69,8 @@ public BlockInfoContiguous convertToCompleteBlock() {
} }
@Override @Override
void addStorage(DatanodeStorageInfo storage, Block reportedBlock) { boolean addStorage(DatanodeStorageInfo storage, Block reportedBlock) {
ContiguousBlockStorageOp.addStorage(this, storage); return ContiguousBlockStorageOp.addStorage(this, storage);
} }
@Override @Override
@ -84,11 +88,6 @@ void replaceBlock(BlockInfo newBlock) {
ContiguousBlockStorageOp.replaceBlock(this, newBlock); ContiguousBlockStorageOp.replaceBlock(this, newBlock);
} }
@Override
boolean hasEmptyStorage() {
return ContiguousBlockStorageOp.hasEmptyStorage(this);
}
@Override @Override
public void setExpectedLocations(DatanodeStorageInfo[] targets) { public void setExpectedLocations(DatanodeStorageInfo[] targets) {
int numLocations = targets == null ? 0 : targets.length; int numLocations = targets == null ? 0 : targets.length;

View File

@ -17,6 +17,7 @@
*/ */
package org.apache.hadoop.hdfs.server.blockmanagement; package org.apache.hadoop.hdfs.server.blockmanagement;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX;
import static org.apache.hadoop.util.ExitUtil.terminate; import static org.apache.hadoop.util.ExitUtil.terminate;
import java.io.IOException; import java.io.IOException;
@ -195,7 +196,7 @@ public int getPendingDataNodeMessageCount() {
* notified of all block deletions that might have been pending * notified of all block deletions that might have been pending
* when the failover happened. * when the failover happened.
*/ */
private final Set<BlockInfo> postponedMisreplicatedBlocks = Sets.newHashSet(); private final Set<Block> postponedMisreplicatedBlocks = Sets.newHashSet();
/** /**
* Maps a StorageID to the set of blocks that are "extra" for this * Maps a StorageID to the set of blocks that are "extra" for this
@ -336,7 +337,8 @@ public BlockManager(final Namesystem namesystem, final Configuration conf)
DFSConfigKeys.DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_KEY, DFSConfigKeys.DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_KEY,
DFSConfigKeys.DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_DEFAULT); DFSConfigKeys.DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_DEFAULT);
this.shouldCheckForEnoughRacks = this.shouldCheckForEnoughRacks =
conf.get(DFSConfigKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY) != null; conf.get(DFSConfigKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY) == null
? false : true;
this.blocksInvalidateWorkPct = DFSUtil.getInvalidateWorkPctPerIteration(conf); this.blocksInvalidateWorkPct = DFSUtil.getInvalidateWorkPctPerIteration(conf);
this.blocksReplWorkMultiplier = DFSUtil.getReplWorkMultiplier(conf); this.blocksReplWorkMultiplier = DFSUtil.getReplWorkMultiplier(conf);
@ -455,7 +457,8 @@ private boolean isBlockTokenEnabled() {
/** Should the access keys be updated? */ /** Should the access keys be updated? */
boolean shouldUpdateBlockKey(final long updateTime) throws IOException { boolean shouldUpdateBlockKey(final long updateTime) throws IOException {
return isBlockTokenEnabled() && blockTokenSecretManager.updateKeys(updateTime); return isBlockTokenEnabled()? blockTokenSecretManager.updateKeys(updateTime)
: false;
} }
public void activate(Configuration conf) { public void activate(Configuration conf) {
@ -508,14 +511,14 @@ public void metaSave(PrintWriter out) {
synchronized (neededReplications) { synchronized (neededReplications) {
out.println("Metasave: Blocks waiting for replication: " + out.println("Metasave: Blocks waiting for replication: " +
neededReplications.size()); neededReplications.size());
for (BlockInfo block : neededReplications) { for (Block block : neededReplications) {
dumpBlockMeta(block, out); dumpBlockMeta(block, out);
} }
} }
// Dump any postponed over-replicated blocks // Dump any postponed over-replicated blocks
out.println("Mis-replicated blocks that have been postponed:"); out.println("Mis-replicated blocks that have been postponed:");
for (BlockInfo block : postponedMisreplicatedBlocks) { for (Block block : postponedMisreplicatedBlocks) {
dumpBlockMeta(block, out); dumpBlockMeta(block, out);
} }
@ -533,9 +536,11 @@ public void metaSave(PrintWriter out) {
* Dump the metadata for the given block in a human-readable * Dump the metadata for the given block in a human-readable
* form. * form.
*/ */
private void dumpBlockMeta(BlockInfo block, PrintWriter out) { private void dumpBlockMeta(Block block, PrintWriter out) {
List<DatanodeDescriptor> containingNodes = new ArrayList<>(); List<DatanodeDescriptor> containingNodes =
List<DatanodeStorageInfo> containingLiveReplicasNodes = new ArrayList<>(); new ArrayList<DatanodeDescriptor>();
List<DatanodeStorageInfo> containingLiveReplicasNodes =
new ArrayList<>();
NumberReplicas numReplicas = new NumberReplicas(); NumberReplicas numReplicas = new NumberReplicas();
// source node returned is not used // source node returned is not used
@ -543,16 +548,17 @@ private void dumpBlockMeta(BlockInfo block, PrintWriter out) {
containingLiveReplicasNodes, numReplicas, containingLiveReplicasNodes, numReplicas,
UnderReplicatedBlocks.LEVEL); UnderReplicatedBlocks.LEVEL);
// containingLiveReplicasNodes can include READ_ONLY_SHARED replicas which // containingLiveReplicasNodes can include READ_ONLY_SHARED replicas which are
// are not included in the numReplicas.liveReplicas() count // not included in the numReplicas.liveReplicas() count
assert containingLiveReplicasNodes.size() >= numReplicas.liveReplicas(); assert containingLiveReplicasNodes.size() >= numReplicas.liveReplicas();
int usableReplicas = numReplicas.liveReplicas() + int usableReplicas = numReplicas.liveReplicas() +
numReplicas.decommissionedAndDecommissioning(); numReplicas.decommissionedAndDecommissioning();
BlockCollection bc = block.getBlockCollection(); if (block instanceof BlockInfo) {
String fileName = (bc == null) ? "[orphaned]" : bc.getName(); BlockCollection bc = ((BlockInfo) block).getBlockCollection();
out.print(fileName + ": "); String fileName = (bc == null) ? "[orphaned]" : bc.getName();
out.print(fileName + ": ");
}
// l: == live:, d: == decommissioned c: == corrupt e: == excess // l: == live:, d: == decommissioned c: == corrupt e: == excess
out.print(block + ((usableReplicas > 0)? "" : " MISSING") + out.print(block + ((usableReplicas > 0)? "" : " MISSING") +
" (replicas:" + " (replicas:" +
@ -561,8 +567,8 @@ private void dumpBlockMeta(BlockInfo block, PrintWriter out) {
" c: " + numReplicas.corruptReplicas() + " c: " + numReplicas.corruptReplicas() +
" e: " + numReplicas.excessReplicas() + ") "); " e: " + numReplicas.excessReplicas() + ") ");
Collection<DatanodeDescriptor> corruptNodes = Collection<DatanodeDescriptor> corruptNodes =
corruptReplicas.getNodes(block); corruptReplicas.getNodes(block);
for (DatanodeStorageInfo storage : getStorages(block)) { for (DatanodeStorageInfo storage : getStorages(block)) {
final DatanodeDescriptor node = storage.getDatanodeDescriptor(); final DatanodeDescriptor node = storage.getDatanodeDescriptor();
@ -799,8 +805,7 @@ private List<LocatedBlock> createLocatedBlockList(
final long offset, final long length, final int nrBlocksToReturn, final long offset, final long length, final int nrBlocksToReturn,
final AccessMode mode) throws IOException { final AccessMode mode) throws IOException {
int curBlk; int curBlk;
long curPos = 0; long curPos = 0, blkSize = 0;
long blkSize;
int nrBlocks = (blocks[0].getNumBytes() == 0) ? 0 : blocks.length; int nrBlocks = (blocks[0].getNumBytes() == 0) ? 0 : blocks.length;
for (curBlk = 0; curBlk < nrBlocks; curBlk++) { for (curBlk = 0; curBlk < nrBlocks; curBlk++) {
blkSize = blocks[curBlk].getNumBytes(); blkSize = blocks[curBlk].getNumBytes();
@ -1190,11 +1195,10 @@ public void findAndMarkBlockAsCorrupt(final ExtendedBlock blk,
} }
/** /**
* Mark a replica (of a contiguous block) or an internal block (of a striped *
* block group) as corrupt. * @param b
* @param b Indicating the reported bad block and the corresponding BlockInfo
* stored in blocksMap.
* @param storageInfo storage that contains the block, if known. null otherwise. * @param storageInfo storage that contains the block, if known. null otherwise.
* @throws IOException
*/ */
private void markBlockAsCorrupt(BlockToMarkCorrupt b, private void markBlockAsCorrupt(BlockToMarkCorrupt b,
DatanodeStorageInfo storageInfo, DatanodeStorageInfo storageInfo,
@ -1215,7 +1219,7 @@ private void markBlockAsCorrupt(BlockToMarkCorrupt b,
} }
// Add this replica to corruptReplicas Map // Add this replica to corruptReplicas Map
corruptReplicas.addToCorruptReplicasMap(b.stored, node, b.reason, corruptReplicas.addToCorruptReplicasMap(b.corrupted, node, b.reason,
b.reasonCode); b.reasonCode);
NumberReplicas numberOfReplicas = countNodes(b.stored); NumberReplicas numberOfReplicas = countNodes(b.stored);
@ -1237,7 +1241,7 @@ private void markBlockAsCorrupt(BlockToMarkCorrupt b,
if (hasEnoughLiveReplicas || hasMoreCorruptReplicas if (hasEnoughLiveReplicas || hasMoreCorruptReplicas
|| corruptedDuringWrite) { || corruptedDuringWrite) {
// the block is over-replicated so invalidate the replicas immediately // the block is over-replicated so invalidate the replicas immediately
invalidateBlock(b, node, numberOfReplicas); invalidateBlock(b, node);
} else if (namesystem.isPopulatingReplQueues()) { } else if (namesystem.isPopulatingReplQueues()) {
// add the block to neededReplication // add the block to neededReplication
updateNeededReplications(b.stored, -1, 0); updateNeededReplications(b.stored, -1, 0);
@ -1245,15 +1249,12 @@ private void markBlockAsCorrupt(BlockToMarkCorrupt b,
} }
/** /**
* Invalidates the given block on the given datanode. Note that before this * Invalidates the given block on the given datanode.
* call we have already checked the current live replicas of the block and * @return true if the block was successfully invalidated and no longer
* make sure it's safe to invalidate the replica. * present in the BlocksMap
*
* @return true if the replica was successfully invalidated and no longer
* associated with the DataNode.
*/ */
private boolean invalidateBlock(BlockToMarkCorrupt b, DatanodeInfo dn, private boolean invalidateBlock(BlockToMarkCorrupt b, DatanodeInfo dn
NumberReplicas nr) throws IOException { ) throws IOException {
blockLog.info("BLOCK* invalidateBlock: {} on {}", b, dn); blockLog.info("BLOCK* invalidateBlock: {} on {}", b, dn);
DatanodeDescriptor node = getDatanodeManager().getDatanode(dn); DatanodeDescriptor node = getDatanodeManager().getDatanode(dn);
if (node == null) { if (node == null) {
@ -1262,30 +1263,35 @@ private boolean invalidateBlock(BlockToMarkCorrupt b, DatanodeInfo dn,
} }
// Check how many copies we have of the block // Check how many copies we have of the block
NumberReplicas nr = countNodes(b.stored);
if (nr.replicasOnStaleNodes() > 0) { if (nr.replicasOnStaleNodes() > 0) {
blockLog.info("BLOCK* invalidateBlocks: postponing " + blockLog.info("BLOCK* invalidateBlocks: postponing " +
"invalidation of {} on {} because {} replica(s) are located on " + "invalidation of {} on {} because {} replica(s) are located on " +
"nodes with potentially out-of-date block reports", b, dn, "nodes with potentially out-of-date block reports", b, dn,
nr.replicasOnStaleNodes()); nr.replicasOnStaleNodes());
postponeBlock(b.stored); postponeBlock(b.corrupted);
return false; return false;
} else { } else if (nr.liveReplicas() >= 1) {
// we already checked the number of replicas in the caller of this // If we have at least one copy on a live node, then we can delete it.
// function and we know there is at least one copy on a live node, so we
// can delete it.
addToInvalidates(b.corrupted, dn); addToInvalidates(b.corrupted, dn);
removeStoredBlock(b.stored, node); removeStoredBlock(b.stored, node);
blockLog.debug("BLOCK* invalidateBlocks: {} on {} listed for deletion.", blockLog.debug("BLOCK* invalidateBlocks: {} on {} listed for deletion.",
b, dn); b, dn);
return true; return true;
} else {
blockLog.info("BLOCK* invalidateBlocks: {} on {} is the only copy and" +
" was not deleted", b, dn);
return false;
} }
} }
public void setPostponeBlocksFromFuture(boolean postpone) { public void setPostponeBlocksFromFuture(boolean postpone) {
this.shouldPostponeBlocksFromFuture = postpone; this.shouldPostponeBlocksFromFuture = postpone;
} }
private void postponeBlock(BlockInfo blk) {
private void postponeBlock(Block blk) {
if (postponedMisreplicatedBlocks.add(blk)) { if (postponedMisreplicatedBlocks.add(blk)) {
postponedMisreplicatedBlocksCount.incrementAndGet(); postponedMisreplicatedBlocksCount.incrementAndGet();
} }
@ -1359,7 +1365,7 @@ int computeReplicationWorkForBlocks(List<List<BlockInfo>> blocksToReplicate) {
int requiredReplication, numEffectiveReplicas; int requiredReplication, numEffectiveReplicas;
List<DatanodeDescriptor> containingNodes; List<DatanodeDescriptor> containingNodes;
DatanodeDescriptor srcNode; DatanodeDescriptor srcNode;
BlockCollection bc; BlockCollection bc = null;
int additionalReplRequired; int additionalReplRequired;
int scheduledWork = 0; int scheduledWork = 0;
@ -1518,9 +1524,9 @@ int computeReplicationWorkForBlocks(List<List<BlockInfo>> blocksToReplicate) {
DatanodeStorageInfo[] targets = rw.targets; DatanodeStorageInfo[] targets = rw.targets;
if (targets != null && targets.length != 0) { if (targets != null && targets.length != 0) {
StringBuilder targetList = new StringBuilder("datanode(s)"); StringBuilder targetList = new StringBuilder("datanode(s)");
for (DatanodeStorageInfo target : targets) { for (int k = 0; k < targets.length; k++) {
targetList.append(' '); targetList.append(' ');
targetList.append(target.getDatanodeDescriptor()); targetList.append(targets[k].getDatanodeDescriptor());
} }
blockLog.info("BLOCK* ask {} to replicate {} to {}", rw.srcNode, blockLog.info("BLOCK* ask {} to replicate {} to {}", rw.srcNode,
rw.block, targetList); rw.block, targetList);
@ -1597,8 +1603,8 @@ List<DatanodeDescriptor> getDatanodeDescriptors(List<String> nodes) {
List<DatanodeDescriptor> datanodeDescriptors = null; List<DatanodeDescriptor> datanodeDescriptors = null;
if (nodes != null) { if (nodes != null) {
datanodeDescriptors = new ArrayList<>(nodes.size()); datanodeDescriptors = new ArrayList<>(nodes.size());
for (String nodeStr : nodes) { for (int i = 0; i < nodes.size(); i++) {
DatanodeDescriptor node = datanodeManager.getDatanodeDescriptor(nodeStr); DatanodeDescriptor node = datanodeManager.getDatanodeDescriptor(nodes.get(i));
if (node != null) { if (node != null) {
datanodeDescriptors.add(node); datanodeDescriptors.add(node);
} }
@ -1637,7 +1643,7 @@ List<DatanodeDescriptor> getDatanodeDescriptors(List<String> nodes) {
* the given block * the given block
*/ */
@VisibleForTesting @VisibleForTesting
DatanodeDescriptor chooseSourceDatanode(BlockInfo block, DatanodeDescriptor chooseSourceDatanode(Block block,
List<DatanodeDescriptor> containingNodes, List<DatanodeDescriptor> containingNodes,
List<DatanodeStorageInfo> nodesContainingLiveReplicas, List<DatanodeStorageInfo> nodesContainingLiveReplicas,
NumberReplicas numReplicas, NumberReplicas numReplicas,
@ -1717,16 +1723,16 @@ private void processPendingReplications() {
if (timedOutItems != null) { if (timedOutItems != null) {
namesystem.writeLock(); namesystem.writeLock();
try { try {
for (BlockInfo timedOutItem : timedOutItems) { for (int i = 0; i < timedOutItems.length; i++) {
/* /*
* Use the blockinfo from the blocksmap to be certain we're working * Use the blockinfo from the blocksmap to be certain we're working
* with the most up-to-date block information (e.g. genstamp). * with the most up-to-date block information (e.g. genstamp).
*/ */
BlockInfo bi = getStoredBlock(timedOutItem); BlockInfo bi = getStoredBlock(timedOutItems[i]);
if (bi == null) { if (bi == null) {
continue; continue;
} }
NumberReplicas num = countNodes(timedOutItem); NumberReplicas num = countNodes(timedOutItems[i]);
if (isNeededReplication(bi, getReplication(bi), num.liveReplicas())) { if (isNeededReplication(bi, getReplication(bi), num.liveReplicas())) {
neededReplications.add(bi, num.liveReplicas(), neededReplications.add(bi, num.liveReplicas(),
num.decommissionedAndDecommissioning(), getReplication(bi)); num.decommissionedAndDecommissioning(), getReplication(bi));
@ -1743,7 +1749,7 @@ private void processPendingReplications() {
public long requestBlockReportLeaseId(DatanodeRegistration nodeReg) { public long requestBlockReportLeaseId(DatanodeRegistration nodeReg) {
assert namesystem.hasReadLock(); assert namesystem.hasReadLock();
DatanodeDescriptor node; DatanodeDescriptor node = null;
try { try {
node = datanodeManager.getDatanode(nodeReg); node = datanodeManager.getDatanode(nodeReg);
} catch (UnregisteredNodeException e) { } catch (UnregisteredNodeException e) {
@ -2005,7 +2011,7 @@ void rescanPostponedMisreplicatedBlocks() {
startIndex += (base+1); startIndex += (base+1);
} }
} }
Iterator<BlockInfo> it = postponedMisreplicatedBlocks.iterator(); Iterator<Block> it = postponedMisreplicatedBlocks.iterator();
for (int tmp = 0; tmp < startIndex; tmp++) { for (int tmp = 0; tmp < startIndex; tmp++) {
it.next(); it.next();
} }
@ -2100,7 +2106,7 @@ public void markBlockReplicasAsCorrupt(Block oldBlock, BlockInfo block,
long oldGenerationStamp, long oldNumBytes, long oldGenerationStamp, long oldNumBytes,
DatanodeStorageInfo[] newStorages) throws IOException { DatanodeStorageInfo[] newStorages) throws IOException {
assert namesystem.hasWriteLock(); assert namesystem.hasWriteLock();
BlockToMarkCorrupt b; BlockToMarkCorrupt b = null;
if (block.getGenerationStamp() != oldGenerationStamp) { if (block.getGenerationStamp() != oldGenerationStamp) {
b = new BlockToMarkCorrupt(oldBlock, block, oldGenerationStamp, b = new BlockToMarkCorrupt(oldBlock, block, oldGenerationStamp,
"genstamp does not match " + oldGenerationStamp "genstamp does not match " + oldGenerationStamp
@ -2702,7 +2708,7 @@ private Block addStoredBlock(final BlockInfo block,
" but corrupt replicas map has " + corruptReplicasCount); " but corrupt replicas map has " + corruptReplicasCount);
} }
if ((corruptReplicasCount > 0) && (numLiveReplicas >= fileReplication)) { if ((corruptReplicasCount > 0) && (numLiveReplicas >= fileReplication)) {
invalidateCorruptReplicas(storedBlock, reportedBlock, num); invalidateCorruptReplicas(storedBlock, reportedBlock);
} }
return storedBlock; return storedBlock;
} }
@ -2735,20 +2741,18 @@ private void logAddStoredBlock(BlockInfo storedBlock,
* *
* @param blk Block whose corrupt replicas need to be invalidated * @param blk Block whose corrupt replicas need to be invalidated
*/ */
private void invalidateCorruptReplicas(BlockInfo blk, Block reported, private void invalidateCorruptReplicas(BlockInfo blk, Block reported) {
NumberReplicas numberReplicas) {
Collection<DatanodeDescriptor> nodes = corruptReplicas.getNodes(blk); Collection<DatanodeDescriptor> nodes = corruptReplicas.getNodes(blk);
boolean removedFromBlocksMap = true; boolean removedFromBlocksMap = true;
if (nodes == null) if (nodes == null)
return; return;
// make a copy of the array of nodes in order to avoid // make a copy of the array of nodes in order to avoid
// ConcurrentModificationException, when the block is removed from the node // ConcurrentModificationException, when the block is removed from the node
DatanodeDescriptor[] nodesCopy = nodes.toArray( DatanodeDescriptor[] nodesCopy = nodes.toArray(new DatanodeDescriptor[0]);
new DatanodeDescriptor[nodes.size()]);
for (DatanodeDescriptor node : nodesCopy) { for (DatanodeDescriptor node : nodesCopy) {
try { try {
if (!invalidateBlock(new BlockToMarkCorrupt(reported, blk, null, if (!invalidateBlock(new BlockToMarkCorrupt(reported, blk, null,
Reason.ANY), node, numberReplicas)) { Reason.ANY), node)) {
removedFromBlocksMap = false; removedFromBlocksMap = false;
} }
} catch (IOException e) { } catch (IOException e) {
@ -2798,6 +2802,7 @@ private void stopReplicationInitializer() {
replicationQueuesInitializer.join(); replicationQueuesInitializer.join();
} catch (final InterruptedException e) { } catch (final InterruptedException e) {
LOG.warn("Interrupted while waiting for replicationQueueInitializer. Returning.."); LOG.warn("Interrupted while waiting for replicationQueueInitializer. Returning..");
return;
} finally { } finally {
replicationQueuesInitializer = null; replicationQueuesInitializer = null;
} }
@ -3159,7 +3164,8 @@ public void removeStoredBlock(BlockInfo storedBlock,
CachedBlock cblock = namesystem.getCacheManager().getCachedBlocks() CachedBlock cblock = namesystem.getCacheManager().getCachedBlocks()
.get(new CachedBlock(storedBlock.getBlockId(), (short) 0, false)); .get(new CachedBlock(storedBlock.getBlockId(), (short) 0, false));
if (cblock != null) { if (cblock != null) {
boolean removed = node.getPendingCached().remove(cblock); boolean removed = false;
removed |= node.getPendingCached().remove(cblock);
removed |= node.getCached().remove(cblock); removed |= node.getCached().remove(cblock);
removed |= node.getPendingUncached().remove(cblock); removed |= node.getPendingUncached().remove(cblock);
if (removed) { if (removed) {
@ -3375,7 +3381,7 @@ public NumberReplicas countNodes(BlockInfo b) {
int excess = 0; int excess = 0;
int stale = 0; int stale = 0;
Collection<DatanodeDescriptor> nodesCorrupt = corruptReplicas.getNodes(b); Collection<DatanodeDescriptor> nodesCorrupt = corruptReplicas.getNodes(b);
for (DatanodeStorageInfo storage : blocksMap.getStorages(b, State.NORMAL)) { for(DatanodeStorageInfo storage : blocksMap.getStorages(b, State.NORMAL)) {
final DatanodeDescriptor node = storage.getDatanodeDescriptor(); final DatanodeDescriptor node = storage.getDatanodeDescriptor();
if ((nodesCorrupt != null) && (nodesCorrupt.contains(node))) { if ((nodesCorrupt != null) && (nodesCorrupt.contains(node))) {
corrupt++; corrupt++;
@ -3396,8 +3402,7 @@ public NumberReplicas countNodes(BlockInfo b) {
stale++; stale++;
} }
} }
return new NumberReplicas(live, decommissioned, decommissioning, corrupt, return new NumberReplicas(live, decommissioned, decommissioning, corrupt, excess, stale);
excess, stale);
} }
/** /**
@ -3580,6 +3585,8 @@ public boolean checkBlocksProperlyReplicated(
String src, BlockInfo[] blocks) { String src, BlockInfo[] blocks) {
for (BlockInfo b: blocks) { for (BlockInfo b: blocks) {
if (!b.isComplete()) { if (!b.isComplete()) {
final BlockInfoUnderConstruction uc =
(BlockInfoUnderConstruction)b;
final int numNodes = b.numNodes(); final int numNodes = b.numNodes();
final int min = getMinStorageNum(b); final int min = getMinStorageNum(b);
final BlockUCState state = b.getBlockUCState(); final BlockUCState state = b.getBlockUCState();
@ -3705,7 +3712,11 @@ public BlockCollection getBlockCollection(Block b) {
return blocksMap.getBlockCollection(b); return blocksMap.getBlockCollection(b);
} }
public void removeBlockFromMap(BlockInfo block) { public int numCorruptReplicas(Block block) {
return corruptReplicas.numCorruptReplicas(block);
}
public void removeBlockFromMap(Block block) {
removeFromExcessReplicateMap(block); removeFromExcessReplicateMap(block);
blocksMap.removeBlock(block); blocksMap.removeBlock(block);
// If block is removed from blocksMap remove it from corruptReplicasMap // If block is removed from blocksMap remove it from corruptReplicasMap
@ -3715,7 +3726,7 @@ public void removeBlockFromMap(BlockInfo block) {
/** /**
* If a block is removed from blocksMap, remove it from excessReplicateMap. * If a block is removed from blocksMap, remove it from excessReplicateMap.
*/ */
private void removeFromExcessReplicateMap(BlockInfo block) { private void removeFromExcessReplicateMap(Block block) {
for (DatanodeStorageInfo info : getStorages(block)) { for (DatanodeStorageInfo info : getStorages(block)) {
String uuid = info.getDatanodeDescriptor().getDatanodeUuid(); String uuid = info.getDatanodeDescriptor().getDatanodeUuid();
LightWeightLinkedSet<BlockInfo> excessReplicas = LightWeightLinkedSet<BlockInfo> excessReplicas =
@ -3746,14 +3757,14 @@ public Iterator<BlockInfo> getCorruptReplicaBlockIterator() {
/** /**
* Get the replicas which are corrupt for a given block. * Get the replicas which are corrupt for a given block.
*/ */
public Collection<DatanodeDescriptor> getCorruptReplicas(BlockInfo block) { public Collection<DatanodeDescriptor> getCorruptReplicas(Block block) {
return corruptReplicas.getNodes(block); return corruptReplicas.getNodes(block);
} }
/** /**
* Get reason for certain corrupted replicas for a given block and a given dn. * Get reason for certain corrupted replicas for a given block and a given dn.
*/ */
public String getCorruptReason(BlockInfo block, DatanodeDescriptor node) { public String getCorruptReason(Block block, DatanodeDescriptor node) {
return corruptReplicas.getCorruptReason(block, node); return corruptReplicas.getCorruptReason(block, node);
} }
@ -3847,7 +3858,7 @@ public void clearQueues() {
datanodeManager.clearPendingQueues(); datanodeManager.clearPendingQueues();
postponedMisreplicatedBlocks.clear(); postponedMisreplicatedBlocks.clear();
postponedMisreplicatedBlocksCount.set(0); postponedMisreplicatedBlocksCount.set(0);
} };
public static LocatedBlock newLocatedBlock( public static LocatedBlock newLocatedBlock(
ExtendedBlock b, DatanodeStorageInfo[] storages, ExtendedBlock b, DatanodeStorageInfo[] storages,

View File

@ -117,7 +117,7 @@ BlockInfo addBlockCollection(BlockInfo b, BlockCollection bc) {
* remove it from all data-node lists it belongs to; * remove it from all data-node lists it belongs to;
* and remove all data-node locations associated with the block. * and remove all data-node locations associated with the block.
*/ */
void removeBlock(BlockInfo block) { void removeBlock(Block block) {
BlockInfo blockInfo = blocks.remove(block); BlockInfo blockInfo = blocks.remove(block);
if (blockInfo == null) if (blockInfo == null)
return; return;
@ -190,7 +190,7 @@ boolean removeNode(Block b, DatanodeDescriptor node) {
// remove block from the data-node list and the node from the block info // remove block from the data-node list and the node from the block info
boolean removed = node.removeBlock(info); boolean removed = node.removeBlock(info);
if (info.hasEmptyStorage() // no datanodes left if (info.getDatanode(0) == null // no datanodes left
&& info.isDeleted()) { // does not belong to a file && info.isDeleted()) { // does not belong to a file
blocks.remove(b); // remove block from the map blocks.remove(b); // remove block from the map
} }

View File

@ -45,12 +45,13 @@ private static int ensureCapacity(BlockInfo b, int num) {
return last; return last;
} }
static void addStorage(BlockInfo b, DatanodeStorageInfo storage) { static boolean addStorage(BlockInfo b, DatanodeStorageInfo storage) {
// find the last null node // find the last null node
int lastNode = ensureCapacity(b, 1); int lastNode = ensureCapacity(b, 1);
b.setStorageInfo(lastNode, storage); b.setStorageInfo(lastNode, storage);
b.setNext(lastNode, null); b.setNext(lastNode, null);
b.setPrevious(lastNode, null); b.setPrevious(lastNode, null);
return true;
} }
static boolean removeStorage(BlockInfo b, static boolean removeStorage(BlockInfo b,
@ -102,8 +103,4 @@ static void replaceBlock(BlockInfo b, BlockInfo newBlock) {
"newBlock already exists."); "newBlock already exists.");
} }
} }
static boolean hasEmptyStorage(BlockInfo b) {
return b.getStorageInfo(0) == null;
}
} }

View File

@ -17,8 +17,8 @@
*/ */
package org.apache.hadoop.hdfs.server.blockmanagement; package org.apache.hadoop.hdfs.server.blockmanagement;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.ipc.Server; import org.apache.hadoop.ipc.Server;
@ -46,12 +46,8 @@ public static enum Reason {
CORRUPTION_REPORTED // client or datanode reported the corruption CORRUPTION_REPORTED // client or datanode reported the corruption
} }
/** private final SortedMap<Block, Map<DatanodeDescriptor, Reason>> corruptReplicasMap =
* Used to track corrupted replicas (for contiguous block) or internal blocks new TreeMap<Block, Map<DatanodeDescriptor, Reason>>();
* (for striped block) and the corresponding DataNodes. For a striped block,
* the key here is the striped block group object stored in the blocksMap.
*/
private final SortedMap<BlockInfo, Map<DatanodeDescriptor, Reason>> corruptReplicasMap = new TreeMap<>();
/** /**
* Mark the block belonging to datanode as corrupt. * Mark the block belonging to datanode as corrupt.
@ -61,21 +57,21 @@ public static enum Reason {
* @param reason a textual reason (for logging purposes) * @param reason a textual reason (for logging purposes)
* @param reasonCode the enum representation of the reason * @param reasonCode the enum representation of the reason
*/ */
void addToCorruptReplicasMap(BlockInfo blk, DatanodeDescriptor dn, void addToCorruptReplicasMap(Block blk, DatanodeDescriptor dn,
String reason, Reason reasonCode) { String reason, Reason reasonCode) {
Map <DatanodeDescriptor, Reason> nodes = corruptReplicasMap.get(blk); Map <DatanodeDescriptor, Reason> nodes = corruptReplicasMap.get(blk);
if (nodes == null) { if (nodes == null) {
nodes = new HashMap<>(); nodes = new HashMap<DatanodeDescriptor, Reason>();
corruptReplicasMap.put(blk, nodes); corruptReplicasMap.put(blk, nodes);
} }
String reasonText; String reasonText;
if (reason != null) { if (reason != null) {
reasonText = " because " + reason; reasonText = " because " + reason;
} else { } else {
reasonText = ""; reasonText = "";
} }
if (!nodes.keySet().contains(dn)) { if (!nodes.keySet().contains(dn)) {
NameNode.blockStateChangeLog.info( NameNode.blockStateChangeLog.info(
"BLOCK NameSystem.addToCorruptReplicasMap: {} added as corrupt on " "BLOCK NameSystem.addToCorruptReplicasMap: {} added as corrupt on "
@ -96,7 +92,7 @@ void addToCorruptReplicasMap(BlockInfo blk, DatanodeDescriptor dn,
* *
* @param blk Block to be removed * @param blk Block to be removed
*/ */
void removeFromCorruptReplicasMap(BlockInfo blk) { void removeFromCorruptReplicasMap(Block blk) {
if (corruptReplicasMap != null) { if (corruptReplicasMap != null) {
corruptReplicasMap.remove(blk); corruptReplicasMap.remove(blk);
} }
@ -109,13 +105,12 @@ void removeFromCorruptReplicasMap(BlockInfo blk) {
* @return true if the removal is successful; * @return true if the removal is successful;
false if the replica is not in the map false if the replica is not in the map
*/ */
boolean removeFromCorruptReplicasMap(BlockInfo blk, boolean removeFromCorruptReplicasMap(Block blk, DatanodeDescriptor datanode) {
DatanodeDescriptor datanode) {
return removeFromCorruptReplicasMap(blk, datanode, Reason.ANY); return removeFromCorruptReplicasMap(blk, datanode, Reason.ANY);
} }
boolean removeFromCorruptReplicasMap(BlockInfo blk, boolean removeFromCorruptReplicasMap(Block blk, DatanodeDescriptor datanode,
DatanodeDescriptor datanode, Reason reason) { Reason reason) {
Map <DatanodeDescriptor, Reason> datanodes = corruptReplicasMap.get(blk); Map <DatanodeDescriptor, Reason> datanodes = corruptReplicasMap.get(blk);
if (datanodes==null) if (datanodes==null)
return false; return false;
@ -144,9 +139,11 @@ boolean removeFromCorruptReplicasMap(BlockInfo blk,
* @param blk Block for which nodes are requested * @param blk Block for which nodes are requested
* @return collection of nodes. Null if does not exists * @return collection of nodes. Null if does not exists
*/ */
Collection<DatanodeDescriptor> getNodes(BlockInfo blk) { Collection<DatanodeDescriptor> getNodes(Block blk) {
Map<DatanodeDescriptor, Reason> nodes = corruptReplicasMap.get(blk); Map <DatanodeDescriptor, Reason> nodes = corruptReplicasMap.get(blk);
return nodes != null ? nodes.keySet() : null; if (nodes == null)
return null;
return nodes.keySet();
} }
/** /**
@ -156,12 +153,12 @@ Collection<DatanodeDescriptor> getNodes(BlockInfo blk) {
* @param node DatanodeDescriptor which holds the replica * @param node DatanodeDescriptor which holds the replica
* @return true if replica is corrupt, false if does not exists in this map * @return true if replica is corrupt, false if does not exists in this map
*/ */
boolean isReplicaCorrupt(BlockInfo blk, DatanodeDescriptor node) { boolean isReplicaCorrupt(Block blk, DatanodeDescriptor node) {
Collection<DatanodeDescriptor> nodes = getNodes(blk); Collection<DatanodeDescriptor> nodes = getNodes(blk);
return ((nodes != null) && (nodes.contains(node))); return ((nodes != null) && (nodes.contains(node)));
} }
int numCorruptReplicas(BlockInfo blk) { int numCorruptReplicas(Block blk) {
Collection<DatanodeDescriptor> nodes = getNodes(blk); Collection<DatanodeDescriptor> nodes = getNodes(blk);
return (nodes == null) ? 0 : nodes.size(); return (nodes == null) ? 0 : nodes.size();
} }
@ -171,9 +168,9 @@ int size() {
} }
/** /**
* Return a range of corrupt replica block ids. Up to numExpectedBlocks * Return a range of corrupt replica block ids. Up to numExpectedBlocks
* blocks starting at the next block after startingBlockId are returned * blocks starting at the next block after startingBlockId are returned
* (fewer if numExpectedBlocks blocks are unavailable). If startingBlockId * (fewer if numExpectedBlocks blocks are unavailable). If startingBlockId
* is null, up to numExpectedBlocks blocks are returned from the beginning. * is null, up to numExpectedBlocks blocks are returned from the beginning.
* If startingBlockId cannot be found, null is returned. * If startingBlockId cannot be found, null is returned.
* *
@ -184,39 +181,44 @@ int size() {
* @return Up to numExpectedBlocks blocks from startingBlockId if it exists * @return Up to numExpectedBlocks blocks from startingBlockId if it exists
* *
*/ */
@VisibleForTesting
long[] getCorruptReplicaBlockIds(int numExpectedBlocks, long[] getCorruptReplicaBlockIds(int numExpectedBlocks,
Long startingBlockId) { Long startingBlockId) {
if (numExpectedBlocks < 0 || numExpectedBlocks > 100) { if (numExpectedBlocks < 0 || numExpectedBlocks > 100) {
return null; return null;
} }
Iterator<BlockInfo> blockIt = corruptReplicasMap.keySet().iterator();
Iterator<Block> blockIt = corruptReplicasMap.keySet().iterator();
// if the starting block id was specified, iterate over keys until // if the starting block id was specified, iterate over keys until
// we find the matching block. If we find a matching block, break // we find the matching block. If we find a matching block, break
// to leave the iterator on the next block after the specified block. // to leave the iterator on the next block after the specified block.
if (startingBlockId != null) { if (startingBlockId != null) {
boolean isBlockFound = false; boolean isBlockFound = false;
while (blockIt.hasNext()) { while (blockIt.hasNext()) {
BlockInfo b = blockIt.next(); Block b = blockIt.next();
if (b.getBlockId() == startingBlockId) { if (b.getBlockId() == startingBlockId) {
isBlockFound = true; isBlockFound = true;
break; break;
} }
} }
if (!isBlockFound) { if (!isBlockFound) {
return null; return null;
} }
} }
ArrayList<Long> corruptReplicaBlockIds = new ArrayList<>(); ArrayList<Long> corruptReplicaBlockIds = new ArrayList<Long>();
// append up to numExpectedBlocks blockIds to our list // append up to numExpectedBlocks blockIds to our list
for(int i=0; i<numExpectedBlocks && blockIt.hasNext(); i++) { for(int i=0; i<numExpectedBlocks && blockIt.hasNext(); i++) {
corruptReplicaBlockIds.add(blockIt.next().getBlockId()); corruptReplicaBlockIds.add(blockIt.next().getBlockId());
} }
long[] ret = new long[corruptReplicaBlockIds.size()]; long[] ret = new long[corruptReplicaBlockIds.size()];
for(int i=0; i<ret.length; i++) { for(int i=0; i<ret.length; i++) {
ret[i] = corruptReplicaBlockIds.get(i); ret[i] = corruptReplicaBlockIds.get(i);
} }
return ret; return ret;
} }
@ -227,7 +229,7 @@ long[] getCorruptReplicaBlockIds(int numExpectedBlocks,
* @param node datanode that contains this corrupted replica * @param node datanode that contains this corrupted replica
* @return reason * @return reason
*/ */
String getCorruptReason(BlockInfo block, DatanodeDescriptor node) { String getCorruptReason(Block block, DatanodeDescriptor node) {
Reason reason = null; Reason reason = null;
if(corruptReplicasMap.containsKey(block)) { if(corruptReplicasMap.containsKey(block)) {
if (corruptReplicasMap.get(block).containsKey(node)) { if (corruptReplicasMap.get(block).containsKey(node)) {

View File

@ -71,7 +71,7 @@ class FSDirWriteFileOp {
private FSDirWriteFileOp() {} private FSDirWriteFileOp() {}
static boolean unprotectedRemoveBlock( static boolean unprotectedRemoveBlock(
FSDirectory fsd, String path, INodesInPath iip, INodeFile fileNode, FSDirectory fsd, String path, INodesInPath iip, INodeFile fileNode,
BlockInfo block) throws IOException { Block block) throws IOException {
// modify file-> block and blocksMap // modify file-> block and blocksMap
// fileNode should be under construction // fileNode should be under construction
BlockInfoUnderConstruction uc = fileNode.removeLastBlock(block); BlockInfoUnderConstruction uc = fileNode.removeLastBlock(block);
@ -136,9 +136,7 @@ static void abandonBlock(
fsd.writeLock(); fsd.writeLock();
try { try {
// Remove the block from the pending creates list // Remove the block from the pending creates list
BlockInfo storedBlock = fsd.getBlockManager().getStoredBlock(localBlock); if (!unprotectedRemoveBlock(fsd, src, iip, file, localBlock)) {
if (storedBlock != null &&
!unprotectedRemoveBlock(fsd, src, iip, file, storedBlock)) {
return; return;
} }
} finally { } finally {

View File

@ -1038,7 +1038,7 @@ private void updateBlocks(FSDirectory fsDir, BlockListUpdatingOp op,
throw new IOException("Trying to remove more than one block from file " throw new IOException("Trying to remove more than one block from file "
+ path); + path);
} }
BlockInfo oldBlock = oldBlocks[oldBlocks.length - 1]; Block oldBlock = oldBlocks[oldBlocks.length - 1];
boolean removed = FSDirWriteFileOp.unprotectedRemoveBlock( boolean removed = FSDirWriteFileOp.unprotectedRemoveBlock(
fsDir, path, iip, file, oldBlock); fsDir, path, iip, file, oldBlock);
if (!removed && !(op instanceof UpdateBlocksOp)) { if (!removed && !(op instanceof UpdateBlocksOp)) {

View File

@ -265,8 +265,10 @@ public void blockIdCK(String blockId) {
out.println("No. of corrupted Replica: " + out.println("No. of corrupted Replica: " +
numberReplicas.corruptReplicas()); numberReplicas.corruptReplicas());
//record datanodes that have corrupted block replica //record datanodes that have corrupted block replica
Collection<DatanodeDescriptor> corruptionRecord = Collection<DatanodeDescriptor> corruptionRecord = null;
bm.getCorruptReplicas(blockInfo); if (bm.getCorruptReplicas(block) != null) {
corruptionRecord = bm.getCorruptReplicas(block);
}
//report block replicas status on datanodes //report block replicas status on datanodes
for(int idx = (blockInfo.numNodes()-1); idx >= 0; idx--) { for(int idx = (blockInfo.numNodes()-1); idx >= 0; idx--) {
@ -275,7 +277,7 @@ public void blockIdCK(String blockId) {
dn.getNetworkLocation() + " "); dn.getNetworkLocation() + " ");
if (corruptionRecord != null && corruptionRecord.contains(dn)) { if (corruptionRecord != null && corruptionRecord.contains(dn)) {
out.print(CORRUPT_STATUS+"\t ReasonCode: "+ out.print(CORRUPT_STATUS+"\t ReasonCode: "+
bm.getCorruptReason(blockInfo, dn)); bm.getCorruptReason(block,dn));
} else if (dn.isDecommissioned() ){ } else if (dn.isDecommissioned() ){
out.print(DECOMMISSIONED_STATUS); out.print(DECOMMISSIONED_STATUS);
} else if (dn.isDecommissionInProgress()) { } else if (dn.isDecommissionInProgress()) {
@ -637,7 +639,7 @@ private void collectBlocksSummary(String parent, HdfsFileStatus file, Result res
LightWeightLinkedSet<BlockInfo> blocksExcess = LightWeightLinkedSet<BlockInfo> blocksExcess =
bm.excessReplicateMap.get(dnDesc.getDatanodeUuid()); bm.excessReplicateMap.get(dnDesc.getDatanodeUuid());
Collection<DatanodeDescriptor> corruptReplicas = Collection<DatanodeDescriptor> corruptReplicas =
bm.getCorruptReplicas(storedBlock); bm.getCorruptReplicas(block.getLocalBlock());
sb.append("("); sb.append("(");
if (dnDesc.isDecommissioned()) { if (dnDesc.isDecommissioned()) {
sb.append("DECOMMISSIONED)"); sb.append("DECOMMISSIONED)");
@ -645,7 +647,7 @@ private void collectBlocksSummary(String parent, HdfsFileStatus file, Result res
sb.append("DECOMMISSIONING)"); sb.append("DECOMMISSIONING)");
} else if (corruptReplicas != null && corruptReplicas.contains(dnDesc)) { } else if (corruptReplicas != null && corruptReplicas.contains(dnDesc)) {
sb.append("CORRUPT)"); sb.append("CORRUPT)");
} else if (blocksExcess != null && blocksExcess.contains(storedBlock)) { } else if (blocksExcess != null && blocksExcess.contains(block.getLocalBlock())) {
sb.append("EXCESS)"); sb.append("EXCESS)");
} else if (dnDesc.isStale(this.staleInterval)) { } else if (dnDesc.isStale(this.staleInterval)) {
sb.append("STALE_NODE)"); sb.append("STALE_NODE)");

View File

@ -574,8 +574,7 @@ public static void waitCorruptReplicas(FileSystem fs, FSNamesystem ns,
throws TimeoutException, InterruptedException { throws TimeoutException, InterruptedException {
int count = 0; int count = 0;
final int ATTEMPTS = 50; final int ATTEMPTS = 50;
int repls = BlockManagerTestUtil.numCorruptReplicas(ns.getBlockManager(), int repls = ns.getBlockManager().numCorruptReplicas(b.getLocalBlock());
b.getLocalBlock());
while (repls != corruptRepls && count < ATTEMPTS) { while (repls != corruptRepls && count < ATTEMPTS) {
try { try {
IOUtils.copyBytes(fs.open(file), new IOUtils.NullOutputStream(), IOUtils.copyBytes(fs.open(file), new IOUtils.NullOutputStream(),
@ -587,8 +586,7 @@ public static void waitCorruptReplicas(FileSystem fs, FSNamesystem ns,
count++; count++;
// check more often so corrupt block reports are not easily missed // check more often so corrupt block reports are not easily missed
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
repls = BlockManagerTestUtil.numCorruptReplicas(ns.getBlockManager(), repls = ns.getBlockManager().numCorruptReplicas(b.getLocalBlock());
b.getLocalBlock());
Thread.sleep(100); Thread.sleep(100);
if (repls == corruptRepls) { if (repls == corruptRepls) {
break; break;

View File

@ -24,7 +24,6 @@
import java.util.Set; import java.util.Set;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
@ -88,7 +87,7 @@ private static int getNumberOfRacks(final BlockManager blockManager,
final Block b) { final Block b) {
final Set<String> rackSet = new HashSet<String>(0); final Set<String> rackSet = new HashSet<String>(0);
final Collection<DatanodeDescriptor> corruptNodes = final Collection<DatanodeDescriptor> corruptNodes =
getCorruptReplicas(blockManager).getNodes(blockManager.getStoredBlock(b)); getCorruptReplicas(blockManager).getNodes(b);
for(DatanodeStorageInfo storage : blockManager.blocksMap.getStorages(b)) { for(DatanodeStorageInfo storage : blockManager.blocksMap.getStorages(b)) {
final DatanodeDescriptor cur = storage.getDatanodeDescriptor(); final DatanodeDescriptor cur = storage.getDatanodeDescriptor();
if (!cur.isDecommissionInProgress() && !cur.isDecommissioned()) { if (!cur.isDecommissionInProgress() && !cur.isDecommissioned()) {
@ -307,8 +306,4 @@ public static void recheckDecommissionState(DatanodeManager dm)
throws ExecutionException, InterruptedException { throws ExecutionException, InterruptedException {
dm.getDecomManager().runMonitor(); dm.getDecomManager().runMonitor();
} }
public static int numCorruptReplicas(BlockManager bm, Block block) {
return bm.corruptReplicas.numCorruptReplicas(bm.getStoredBlock(block));
}
} }

View File

@ -63,7 +63,9 @@ public void testAddStorage() throws Exception {
final DatanodeStorageInfo storage = DFSTestUtil.createDatanodeStorageInfo("storageID", "127.0.0.1"); final DatanodeStorageInfo storage = DFSTestUtil.createDatanodeStorageInfo("storageID", "127.0.0.1");
blockInfo.addStorage(storage, blockInfo); boolean added = blockInfo.addStorage(storage, blockInfo);
Assert.assertTrue(added);
Assert.assertEquals(storage, blockInfo.getStorageInfo(0)); Assert.assertEquals(storage, blockInfo.getStorageInfo(0));
} }
@ -71,7 +73,7 @@ public void testAddStorage() throws Exception {
public void testCopyConstructor() { public void testCopyConstructor() {
BlockInfo old = new BlockInfoContiguous((short) 3); BlockInfo old = new BlockInfoContiguous((short) 3);
try { try {
BlockInfo copy = new BlockInfoContiguous(old); BlockInfo copy = new BlockInfoContiguous((BlockInfoContiguous)old);
assertEquals(old.getBlockCollection(), copy.getBlockCollection()); assertEquals(old.getBlockCollection(), copy.getBlockCollection());
assertEquals(old.getCapacity(), copy.getCapacity()); assertEquals(old.getCapacity(), copy.getCapacity());
} catch (Exception e) { } catch (Exception e) {
@ -108,8 +110,8 @@ public void testBlockListMoveToHead() throws Exception {
final int MAX_BLOCKS = 10; final int MAX_BLOCKS = 10;
DatanodeStorageInfo dd = DFSTestUtil.createDatanodeStorageInfo("s1", "1.1.1.1"); DatanodeStorageInfo dd = DFSTestUtil.createDatanodeStorageInfo("s1", "1.1.1.1");
ArrayList<Block> blockList = new ArrayList<>(MAX_BLOCKS); ArrayList<Block> blockList = new ArrayList<Block>(MAX_BLOCKS);
ArrayList<BlockInfo> blockInfoList = new ArrayList<>(); ArrayList<BlockInfo> blockInfoList = new ArrayList<BlockInfo>();
int headIndex; int headIndex;
int curIndex; int curIndex;

View File

@ -509,7 +509,7 @@ public void testHighestPriReplSrcChosenDespiteMaxReplLimit() throws Exception {
+ " even if all available source nodes have reached their replication" + " even if all available source nodes have reached their replication"
+ " limits below the hard limit.", + " limits below the hard limit.",
bm.chooseSourceDatanode( bm.chooseSourceDatanode(
bm.getStoredBlock(aBlock), aBlock,
cntNodes, cntNodes,
liveNodes, liveNodes,
new NumberReplicas(), new NumberReplicas(),
@ -519,7 +519,7 @@ public void testHighestPriReplSrcChosenDespiteMaxReplLimit() throws Exception {
+ " replication since all available source nodes have reached" + " replication since all available source nodes have reached"
+ " their replication limits.", + " their replication limits.",
bm.chooseSourceDatanode( bm.chooseSourceDatanode(
bm.getStoredBlock(aBlock), aBlock,
cntNodes, cntNodes,
liveNodes, liveNodes,
new NumberReplicas(), new NumberReplicas(),
@ -532,7 +532,7 @@ public void testHighestPriReplSrcChosenDespiteMaxReplLimit() throws Exception {
assertNull("Does not choose a source node for a highest-priority" assertNull("Does not choose a source node for a highest-priority"
+ " replication when all available nodes exceed the hard limit.", + " replication when all available nodes exceed the hard limit.",
bm.chooseSourceDatanode( bm.chooseSourceDatanode(
bm.getStoredBlock(aBlock), aBlock,
cntNodes, cntNodes,
liveNodes, liveNodes,
new NumberReplicas(), new NumberReplicas(),
@ -558,7 +558,7 @@ public void testFavorDecomUntilHardLimit() throws Exception {
+ " if all available source nodes have reached their replication" + " if all available source nodes have reached their replication"
+ " limits below the hard limit.", + " limits below the hard limit.",
bm.chooseSourceDatanode( bm.chooseSourceDatanode(
bm.getStoredBlock(aBlock), aBlock,
cntNodes, cntNodes,
liveNodes, liveNodes,
new NumberReplicas(), new NumberReplicas(),
@ -572,7 +572,7 @@ public void testFavorDecomUntilHardLimit() throws Exception {
assertNull("Does not choose a source decommissioning node for a normal" assertNull("Does not choose a source decommissioning node for a normal"
+ " replication when all available nodes exceed the hard limit.", + " replication when all available nodes exceed the hard limit.",
bm.chooseSourceDatanode( bm.chooseSourceDatanode(
bm.getStoredBlock(aBlock), aBlock,
cntNodes, cntNodes,
liveNodes, liveNodes,
new NumberReplicas(), new NumberReplicas(),

View File

@ -48,19 +48,20 @@ public class TestCorruptReplicaInfo {
private static final Log LOG = private static final Log LOG =
LogFactory.getLog(TestCorruptReplicaInfo.class); LogFactory.getLog(TestCorruptReplicaInfo.class);
private final Map<Long, BlockInfo> block_map = new HashMap<>(); private final Map<Long, Block> block_map =
new HashMap<Long, Block>();
// Allow easy block creation by block id // Allow easy block creation by block id
// Return existing block if one with same block id already exists // Return existing block if one with same block id already exists
private BlockInfo getBlock(Long block_id) { private Block getBlock(Long block_id) {
if (!block_map.containsKey(block_id)) { if (!block_map.containsKey(block_id)) {
block_map.put(block_id, block_map.put(block_id, new Block(block_id,0,0));
new BlockInfoContiguous(new Block(block_id, 0, 0), (short) 1));
} }
return block_map.get(block_id); return block_map.get(block_id);
} }
private BlockInfo getBlock(int block_id) { private Block getBlock(int block_id) {
return getBlock((long)block_id); return getBlock((long)block_id);
} }
@ -81,7 +82,7 @@ public void testCorruptReplicaInfo() throws IOException,
// create a list of block_ids. A list is used to allow easy validation of the // create a list of block_ids. A list is used to allow easy validation of the
// output of getCorruptReplicaBlockIds // output of getCorruptReplicaBlockIds
int NUM_BLOCK_IDS = 140; int NUM_BLOCK_IDS = 140;
List<Long> block_ids = new LinkedList<>(); List<Long> block_ids = new LinkedList<Long>();
for (int i=0;i<NUM_BLOCK_IDS;i++) { for (int i=0;i<NUM_BLOCK_IDS;i++) {
block_ids.add((long)i); block_ids.add((long)i);
} }
@ -129,7 +130,7 @@ public void testCorruptReplicaInfo() throws IOException,
} }
private static void addToCorruptReplicasMap(CorruptReplicasMap crm, private static void addToCorruptReplicasMap(CorruptReplicasMap crm,
BlockInfo blk, DatanodeDescriptor dn) { Block blk, DatanodeDescriptor dn) {
crm.addToCorruptReplicasMap(blk, dn, "TEST", Reason.NONE); crm.addToCorruptReplicasMap(blk, dn, "TEST", Reason.NONE);
} }
} }