HDFS-9775. Erasure Coding : Rename BlockRecoveryWork to BlockReconstructionWork. Contributed by Rakesh R.

Change-Id: I6dfc8efd94fa2bbb4eec0e4730a5a4f92c8a5519
This commit is contained in:
Zhe Zhang 2016-02-09 14:42:49 -08:00
parent 401ae4ecdb
commit a0fb2eff9b
8 changed files with 47 additions and 39 deletions

View File

@ -921,6 +921,9 @@ Trunk (Unreleased)
HDFS-9658. Erasure Coding: allow to use multiple EC policies in striping HDFS-9658. Erasure Coding: allow to use multiple EC policies in striping
related tests. (Rui Li via zhz) related tests. (Rui Li via zhz)
HDFS-9775. Erasure Coding : Rename BlockRecoveryWork to
BlockReconstructionWork. (Rakesh R via zhz)
Release 2.9.0 - UNRELEASED Release 2.9.0 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -1450,7 +1450,7 @@ public class BlockManager implements BlockStatsMXBean {
} }
/** /**
* Scan blocks in {@link #neededReplications} and assign recovery * Scan blocks in {@link #neededReplications} and assign reconstruction
* (replication or erasure coding) work to data-nodes they belong to. * (replication or erasure coding) work to data-nodes they belong to.
* *
* The number of process blocks equals either twice the number of live * The number of process blocks equals either twice the number of live
@ -1458,7 +1458,7 @@ public class BlockManager implements BlockStatsMXBean {
* *
* @return number of blocks scheduled for replication during this iteration. * @return number of blocks scheduled for replication during this iteration.
*/ */
int computeBlockRecoveryWork(int blocksToProcess) { int computeBlockReconstructionWork(int blocksToProcess) {
List<List<BlockInfo>> blocksToReplicate = null; List<List<BlockInfo>> blocksToReplicate = null;
namesystem.writeLock(); namesystem.writeLock();
try { try {
@ -1468,30 +1468,33 @@ public class BlockManager implements BlockStatsMXBean {
} finally { } finally {
namesystem.writeUnlock(); namesystem.writeUnlock();
} }
return computeRecoveryWorkForBlocks(blocksToReplicate); return computeReconstructionWorkForBlocks(blocksToReplicate);
} }
/** /**
* Recover a set of blocks to full strength through replication or * Reconstruct a set of blocks to full strength through replication or
* erasure coding * erasure coding
* *
* @param blocksToRecover blocks to be recovered, for each priority * @param blocksToReconstruct blocks to be reconstructed, for each priority
* @return the number of blocks scheduled for replication * @return the number of blocks scheduled for replication
*/ */
@VisibleForTesting @VisibleForTesting
int computeRecoveryWorkForBlocks(List<List<BlockInfo>> blocksToRecover) { int computeReconstructionWorkForBlocks(
List<List<BlockInfo>> blocksToReconstruct) {
int scheduledWork = 0; int scheduledWork = 0;
List<BlockRecoveryWork> recovWork = new LinkedList<>(); List<BlockReconstructionWork> reconWork = new LinkedList<>();
// Step 1: categorize at-risk blocks into replication and EC tasks // Step 1: categorize at-risk blocks into replication and EC tasks
namesystem.writeLock(); namesystem.writeLock();
try { try {
synchronized (neededReplications) { synchronized (neededReplications) {
for (int priority = 0; priority < blocksToRecover.size(); priority++) { for (int priority = 0; priority < blocksToReconstruct
for (BlockInfo block : blocksToRecover.get(priority)) { .size(); priority++) {
BlockRecoveryWork rw = scheduleRecovery(block, priority); for (BlockInfo block : blocksToReconstruct.get(priority)) {
BlockReconstructionWork rw = scheduleReconstruction(block,
priority);
if (rw != null) { if (rw != null) {
recovWork.add(rw); reconWork.add(rw);
} }
} }
} }
@ -1500,9 +1503,9 @@ public class BlockManager implements BlockStatsMXBean {
namesystem.writeUnlock(); namesystem.writeUnlock();
} }
// Step 2: choose target nodes for each recovery task // Step 2: choose target nodes for each reconstruction task
final Set<Node> excludedNodes = new HashSet<>(); final Set<Node> excludedNodes = new HashSet<>();
for(BlockRecoveryWork rw : recovWork){ for(BlockReconstructionWork rw : reconWork){
// Exclude all of the containing nodes from being targets. // Exclude all of the containing nodes from being targets.
// This list includes decommissioning or corrupt nodes. // This list includes decommissioning or corrupt nodes.
excludedNodes.clear(); excludedNodes.clear();
@ -1521,7 +1524,7 @@ public class BlockManager implements BlockStatsMXBean {
// Step 3: add tasks to the DN // Step 3: add tasks to the DN
namesystem.writeLock(); namesystem.writeLock();
try { try {
for(BlockRecoveryWork rw : recovWork){ for(BlockReconstructionWork rw : reconWork){
final DatanodeStorageInfo[] targets = rw.getTargets(); final DatanodeStorageInfo[] targets = rw.getTargets();
if(targets == null || targets.length == 0){ if(targets == null || targets.length == 0){
rw.resetTargets(); rw.resetTargets();
@ -1529,7 +1532,7 @@ public class BlockManager implements BlockStatsMXBean {
} }
synchronized (neededReplications) { synchronized (neededReplications) {
if (validateRecoveryWork(rw)) { if (validateReconstructionWork(rw)) {
scheduledWork++; scheduledWork++;
} }
} }
@ -1540,7 +1543,7 @@ public class BlockManager implements BlockStatsMXBean {
if (blockLog.isDebugEnabled()) { if (blockLog.isDebugEnabled()) {
// log which blocks have been scheduled for replication // log which blocks have been scheduled for replication
for(BlockRecoveryWork rw : recovWork){ for(BlockReconstructionWork rw : reconWork){
DatanodeStorageInfo[] targets = rw.getTargets(); DatanodeStorageInfo[] targets = rw.getTargets();
if (targets != null && targets.length != 0) { if (targets != null && targets.length != 0) {
StringBuilder targetList = new StringBuilder("datanode(s)"); StringBuilder targetList = new StringBuilder("datanode(s)");
@ -1567,7 +1570,8 @@ public class BlockManager implements BlockStatsMXBean {
(pendingReplicaNum > 0 || isPlacementPolicySatisfied(block)); (pendingReplicaNum > 0 || isPlacementPolicySatisfied(block));
} }
private BlockRecoveryWork scheduleRecovery(BlockInfo block, int priority) { private BlockReconstructionWork scheduleReconstruction(BlockInfo block,
int priority) {
// block should belong to a file // block should belong to a file
BlockCollection bc = getBlockCollection(block); BlockCollection bc = getBlockCollection(block);
// abandoned block or block reopened for append // abandoned block or block reopened for append
@ -1589,8 +1593,8 @@ public class BlockManager implements BlockStatsMXBean {
containingNodes, liveReplicaNodes, numReplicas, containingNodes, liveReplicaNodes, numReplicas,
liveBlockIndices, priority); liveBlockIndices, priority);
if(srcNodes == null || srcNodes.length == 0) { if(srcNodes == null || srcNodes.length == 0) {
// block can not be recovered from any node // block can not be reconstructed from any node
LOG.debug("Block " + block + " cannot be recovered " + LOG.debug("Block " + block + " cannot be reconstructed " +
"from any node"); "from any node");
return null; return null;
} }
@ -1618,7 +1622,7 @@ public class BlockManager implements BlockStatsMXBean {
if (block.isStriped()) { if (block.isStriped()) {
if (pendingNum > 0) { if (pendingNum > 0) {
// Wait the previous recovery to finish. // Wait the previous reconstruction to finish.
return null; return null;
} }
byte[] indices = new byte[liveBlockIndices.size()]; byte[] indices = new byte[liveBlockIndices.size()];
@ -1635,7 +1639,7 @@ public class BlockManager implements BlockStatsMXBean {
} }
} }
private boolean validateRecoveryWork(BlockRecoveryWork rw) { private boolean validateReconstructionWork(BlockReconstructionWork rw) {
BlockInfo block = rw.getBlock(); BlockInfo block = rw.getBlock();
int priority = rw.getPriority(); int priority = rw.getPriority();
// Recheck since global lock was released // Recheck since global lock was released
@ -1672,11 +1676,12 @@ public class BlockManager implements BlockStatsMXBean {
} }
} }
// Add block to the to be recovered list // Add block to the to be reconstructed list
if (block.isStriped()) { if (block.isStriped()) {
assert rw instanceof ErasureCodingWork; assert rw instanceof ErasureCodingWork;
assert rw.getTargets().length > 0; assert rw.getTargets().length > 0;
assert pendingNum == 0: "Should wait the previous recovery to finish"; assert pendingNum == 0 : "Should wait the previous reconstruction"
+ " to finish";
String src = getBlockCollection(block).getName(); String src = getBlockCollection(block).getName();
ErasureCodingPolicy ecPolicy = null; ErasureCodingPolicy ecPolicy = null;
try { try {
@ -1687,7 +1692,7 @@ public class BlockManager implements BlockStatsMXBean {
} }
if (ecPolicy == null) { if (ecPolicy == null) {
blockLog.warn("No erasure coding policy found for the file {}. " blockLog.warn("No erasure coding policy found for the file {}. "
+ "So cannot proceed for recovery", src); + "So cannot proceed for reconstruction", src);
// TODO: we may have to revisit later for what we can do better to // TODO: we may have to revisit later for what we can do better to
// handle this case. // handle this case.
return false; return false;
@ -4239,7 +4244,7 @@ public class BlockManager implements BlockStatsMXBean {
final int nodesToProcess = (int) Math.ceil(numlive final int nodesToProcess = (int) Math.ceil(numlive
* this.blocksInvalidateWorkPct); * this.blocksInvalidateWorkPct);
int workFound = this.computeBlockRecoveryWork(blocksToProcess); int workFound = this.computeBlockReconstructionWork(blocksToProcess);
// Update counters // Update counters
namesystem.writeLock(); namesystem.writeLock();

View File

@ -25,17 +25,17 @@ import java.util.Set;
/** /**
* This class is used internally by * This class is used internally by
* {@link BlockManager#computeRecoveryWorkForBlocks} to represent a task to * {@link BlockManager#computeReconstructionWorkForBlocks} to represent a
* recover a block through replication or erasure coding. Recovery is done by * task to reconstruct a block through replication or erasure coding.
* transferring data from srcNodes to targets * Reconstruction is done by transferring data from srcNodes to targets
*/ */
abstract class BlockRecoveryWork { abstract class BlockReconstructionWork {
private final BlockInfo block; private final BlockInfo block;
private final BlockCollection bc; private final BlockCollection bc;
/** /**
* An erasure coding recovery task has multiple source nodes. * An erasure coding reconstruction task has multiple source nodes.
* A replication task only has 1 source node, stored on top of the array * A replication task only has 1 source node, stored on top of the array
*/ */
private final DatanodeDescriptor[] srcNodes; private final DatanodeDescriptor[] srcNodes;
@ -48,7 +48,7 @@ abstract class BlockRecoveryWork {
private DatanodeStorageInfo[] targets; private DatanodeStorageInfo[] targets;
private final int priority; private final int priority;
public BlockRecoveryWork(BlockInfo block, public BlockReconstructionWork(BlockInfo block,
BlockCollection bc, BlockCollection bc,
DatanodeDescriptor[] srcNodes, DatanodeDescriptor[] srcNodes,
List<DatanodeDescriptor> containingNodes, List<DatanodeDescriptor> containingNodes,

View File

@ -608,7 +608,7 @@ public class DatanodeDescriptor extends DatanodeInfo {
BlockECReconstructionInfo task = new BlockECReconstructionInfo(block, BlockECReconstructionInfo task = new BlockECReconstructionInfo(block,
sources, targets, liveBlockIndices, ecPolicy); sources, targets, liveBlockIndices, ecPolicy);
erasurecodeBlocks.offer(task); erasurecodeBlocks.offer(task);
BlockManager.LOG.debug("Adding block recovery task " + task + "to " BlockManager.LOG.debug("Adding block reconstruction task " + task + "to "
+ getName() + ", current queue size is " + erasurecodeBlocks.size()); + getName() + ", current queue size is " + erasurecodeBlocks.size());
} }

View File

@ -22,7 +22,7 @@ import org.apache.hadoop.net.Node;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
class ErasureCodingWork extends BlockRecoveryWork { class ErasureCodingWork extends BlockReconstructionWork {
private final byte[] liveBlockIndicies; private final byte[] liveBlockIndicies;
public ErasureCodingWork(BlockInfo block, public ErasureCodingWork(BlockInfo block,

View File

@ -19,11 +19,10 @@ package org.apache.hadoop.hdfs.server.blockmanagement;
import org.apache.hadoop.net.Node; import org.apache.hadoop.net.Node;
import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
class ReplicationWork extends BlockRecoveryWork { class ReplicationWork extends BlockReconstructionWork {
public ReplicationWork(BlockInfo block, BlockCollection bc, public ReplicationWork(BlockInfo block, BlockCollection bc,
DatanodeDescriptor[] srcNodes, List<DatanodeDescriptor> containingNodes, DatanodeDescriptor[] srcNodes, List<DatanodeDescriptor> containingNodes,
List<DatanodeStorageInfo> liveReplicaStorages, int additionalReplRequired, List<DatanodeStorageInfo> liveReplicaStorages, int additionalReplRequired,
@ -33,7 +32,8 @@ class ReplicationWork extends BlockRecoveryWork {
assert getSrcNodes().length == 1 : assert getSrcNodes().length == 1 :
"There should be exactly 1 source node that have been selected"; "There should be exactly 1 source node that have been selected";
getSrcNodes()[0].incrementPendingReplicationWithoutTargets(); getSrcNodes()[0].incrementPendingReplicationWithoutTargets();
BlockManager.LOG.debug("Creating a ReplicationWork to recover " + block); BlockManager.LOG
.debug("Creating a ReplicationWork to reconstruct " + block);
} }
@Override @Override

View File

@ -163,7 +163,7 @@ public class BlockManagerTestUtil {
*/ */
public static int computeAllPendingWork(BlockManager bm) { public static int computeAllPendingWork(BlockManager bm) {
int work = computeInvalidationWork(bm); int work = computeInvalidationWork(bm);
work += bm.computeBlockRecoveryWork(Integer.MAX_VALUE); work += bm.computeBlockReconstructionWork(Integer.MAX_VALUE);
return work; return work;
} }

View File

@ -540,8 +540,8 @@ public class TestBlockManager {
assertEquals("Block not initially pending replication", 0, assertEquals("Block not initially pending replication", 0,
bm.pendingReplications.getNumReplicas(block)); bm.pendingReplications.getNumReplicas(block));
assertEquals( assertEquals(
"computeBlockRecoveryWork should indicate replication is needed", 1, "computeBlockReconstructionWork should indicate replication is needed",
bm.computeRecoveryWorkForBlocks(list_all)); 1, bm.computeReconstructionWorkForBlocks(list_all));
assertTrue("replication is pending after work is computed", assertTrue("replication is pending after work is computed",
bm.pendingReplications.getNumReplicas(block) > 0); bm.pendingReplications.getNumReplicas(block) > 0);