HDFS-9869. Erasure Coding: Rename replication-based names in BlockManager to more generic [part-2]. Contributed by Rakesh R.

This commit is contained in:
Zhe Zhang 2016-04-25 22:01:54 -07:00
parent 8eadd7145e
commit 5865fe2bf0
30 changed files with 253 additions and 241 deletions

View File

@ -45,7 +45,8 @@ The following table lists the configuration property names that are deprecated i
| dfs.replication.considerLoad | dfs.namenode.replication.considerLoad |
| dfs.replication.interval | dfs.namenode.replication.interval |
| dfs.replication.min | dfs.namenode.replication.min |
| dfs.replication.pending.timeout.sec | dfs.namenode.replication.pending.timeout-sec |
| dfs.replication.pending.timeout.sec | dfs.namenode.reconstruction.pending.timeout-sec |
| dfs.namenode.replication.pending.timeout-sec | dfs.namenode.reconstruction.pending.timeout-sec |
| dfs.safemode.extension | dfs.namenode.safemode.extension |
| dfs.safemode.threshold.pct | dfs.namenode.safemode.threshold-pct |
| dfs.secondary.http.address | dfs.namenode.secondary.http-address |

View File

@ -120,7 +120,9 @@ public class HdfsConfiguration extends Configuration {
new DeprecationDelta("dfs.replication.min",
DeprecatedKeys.DFS_NAMENODE_REPLICATION_MIN_KEY),
new DeprecationDelta("dfs.replication.pending.timeout.sec",
DeprecatedKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY),
DeprecatedKeys.DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY),
new DeprecationDelta("dfs.namenode.replication.pending.timeout-sec",
DeprecatedKeys.DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY),
new DeprecationDelta("dfs.max-repl-streams",
DeprecatedKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY),
new DeprecationDelta("dfs.permissions",

View File

@ -216,8 +216,8 @@ public interface HdfsClientConfigKeys {
String DFS_NAMENODE_REPLICATION_INTERVAL_KEY =
"dfs.namenode.replication.interval";
String DFS_NAMENODE_REPLICATION_MIN_KEY = "dfs.namenode.replication.min";
String DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY =
"dfs.namenode.replication.pending.timeout-sec";
String DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY =
"dfs.namenode.reconstruction.pending.timeout-sec";
String DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY =
"dfs.namenode.replication.max-streams";
String DFS_PERMISSIONS_ENABLED_KEY = "dfs.permissions.enabled";

View File

@ -213,9 +213,11 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final int DFS_NAMENODE_STRIPE_MIN_DEFAULT = 1;
public static final String DFS_NAMENODE_SAFEMODE_REPLICATION_MIN_KEY =
"dfs.namenode.safemode.replication.min";
public static final String DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY =
HdfsClientConfigKeys.DeprecatedKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY;
public static final int DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_DEFAULT = -1;
public static final String DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY =
"dfs.namenode.reconstruction.pending.timeout-sec";
public static final int DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_DEFAULT = -1;
public static final String DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY =
HdfsClientConfigKeys.DeprecatedKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY;
public static final int DFS_NAMENODE_REPLICATION_MAX_STREAMS_DEFAULT = 2;

View File

@ -147,7 +147,7 @@ public class BlockManager implements BlockStatsMXBean {
private final PendingDataNodeMessages pendingDNMessages =
new PendingDataNodeMessages();
private volatile long pendingReplicationBlocksCount = 0L;
private volatile long pendingReconstructionBlocksCount = 0L;
private volatile long corruptReplicaBlocksCount = 0L;
private volatile long lowRedundancyBlocksCount = 0L;
private volatile long scheduledReplicationBlocksCount = 0L;
@ -161,8 +161,8 @@ public class BlockManager implements BlockStatsMXBean {
private ObjectName mxBeanName;
/** Used by metrics */
public long getPendingReplicationBlocksCount() {
return pendingReplicationBlocksCount;
public long getPendingReconstructionBlocksCount() {
return pendingReconstructionBlocksCount;
}
/** Used by metrics */
public long getUnderReplicatedBlocksCount() {
@ -186,7 +186,7 @@ public class BlockManager implements BlockStatsMXBean {
}
/** Used by metrics */
public long getExcessBlocksCount() {
return excessReplicas.size();
return excessRedundancyMap.size();
}
/** Used by metrics */
public long getPostponedMisreplicatedBlocksCount() {
@ -246,7 +246,8 @@ public class BlockManager implements BlockStatsMXBean {
* Maps a StorageID to the set of blocks that are "extra" for this
* DataNode. We'll eventually remove these extras.
*/
private final ExcessReplicaMap excessReplicas = new ExcessReplicaMap();
private final ExcessRedundancyMap excessRedundancyMap =
new ExcessRedundancyMap();
/**
* Store set of Blocks that need to be replicated 1 or more times.
@ -256,7 +257,7 @@ public class BlockManager implements BlockStatsMXBean {
new LowRedundancyBlocks();
@VisibleForTesting
final PendingReplicationBlocks pendingReplications;
final PendingReconstructionBlocks pendingReconstruction;
/** The maximum number of replicas allowed for a block */
public final short maxReplication;
@ -352,9 +353,10 @@ public class BlockManager implements BlockStatsMXBean {
datanodeManager.getNetworkTopology(),
datanodeManager.getHost2DatanodeMap());
storagePolicySuite = BlockStoragePolicySuite.createDefaultSuite();
pendingReplications = new PendingReplicationBlocks(conf.getInt(
DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY,
DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_DEFAULT) * 1000L);
pendingReconstruction = new PendingReconstructionBlocks(conf.getInt(
DFSConfigKeys.DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY,
DFSConfigKeys.DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_DEFAULT)
* 1000L);
blockTokenSecretManager = createBlockTokenSecretManager(conf);
@ -542,7 +544,7 @@ public class BlockManager implements BlockStatsMXBean {
}
public void activate(Configuration conf, long blockTotal) {
pendingReplications.start();
pendingReconstruction.start();
datanodeManager.activate(conf);
this.replicationThread.setName("ReplicationMonitor");
this.replicationThread.start();
@ -565,7 +567,7 @@ public class BlockManager implements BlockStatsMXBean {
} catch (InterruptedException ie) {
}
datanodeManager.close();
pendingReplications.stop();
pendingReconstruction.stop();
blocksMap.close();
}
@ -604,8 +606,8 @@ public class BlockManager implements BlockStatsMXBean {
dumpBlockMeta(block, out);
}
// Dump blocks from pendingReplication
pendingReplications.metaSave(out);
// Dump blocks from pendingReconstruction
pendingReconstruction.metaSave(out);
// Dump blocks that are waiting to be deleted
invalidateBlocks.dump(out);
@ -765,7 +767,7 @@ public class BlockManager implements BlockStatsMXBean {
/**
* If IBR is not sent from expected locations yet, add the datanodes to
* pendingReplications in order to keep ReplicationMonitor from scheduling
* pendingReconstruction in order to keep ReplicationMonitor from scheduling
* the block.
*/
public void addExpectedReplicasToPending(BlockInfo blk) {
@ -780,7 +782,7 @@ public class BlockManager implements BlockStatsMXBean {
pendingNodes.add(dnd);
}
}
pendingReplications.increment(blk,
pendingReconstruction.increment(blk,
pendingNodes.toArray(new DatanodeDescriptor[pendingNodes.size()]));
}
}
@ -866,7 +868,7 @@ public class BlockManager implements BlockStatsMXBean {
neededReconstruction.remove(lastBlock, replicas.liveReplicas(),
replicas.readOnlyReplicas(),
replicas.decommissionedAndDecommissioning(), getReplication(lastBlock));
pendingReplications.remove(lastBlock);
pendingReconstruction.remove(lastBlock);
// remove this block from the list of pending blocks to be deleted.
for (DatanodeStorageInfo storage : targets) {
@ -1435,7 +1437,7 @@ public class BlockManager implements BlockStatsMXBean {
void updateState() {
pendingReplicationBlocksCount = pendingReplications.size();
pendingReconstructionBlocksCount = pendingReconstruction.size();
lowRedundancyBlocksCount = neededReconstruction.size();
corruptReplicaBlocksCount = corruptReplicas.size();
}
@ -1578,8 +1580,8 @@ public class BlockManager implements BlockStatsMXBean {
}
blockLog.debug(
"BLOCK* neededReconstruction = {} pendingReplications = {}",
neededReconstruction.size(), pendingReplications.size());
"BLOCK* neededReconstruction = {} pendingReconstruction = {}",
neededReconstruction.size(), pendingReconstruction.size());
}
return scheduledWork;
@ -1622,7 +1624,7 @@ public class BlockManager implements BlockStatsMXBean {
// not included in the numReplicas.liveReplicas() count
assert liveReplicaNodes.size() >= numReplicas.liveReplicas();
int pendingNum = pendingReplications.getNumReplicas(block);
int pendingNum = pendingReconstruction.getNumReplicas(block);
if (hasEnoughEffectiveReplicas(block, numReplicas, pendingNum,
requiredReplication)) {
neededReconstruction.remove(block, priority);
@ -1690,7 +1692,7 @@ public class BlockManager implements BlockStatsMXBean {
// do not schedule more if enough replicas is already pending
final short requiredReplication = getExpectedReplicaNum(block);
NumberReplicas numReplicas = countNodes(block);
final int pendingNum = pendingReplications.getNumReplicas(block);
final int pendingNum = pendingReconstruction.getNumReplicas(block);
if (hasEnoughEffectiveReplicas(block, numReplicas, pendingNum,
requiredReplication)) {
neededReconstruction.remove(block, priority);
@ -1718,8 +1720,8 @@ public class BlockManager implements BlockStatsMXBean {
// Move the block-replication into a "pending" state.
// The reason we use 'pending' is so we can retry
// replications that fail after an appropriate amount of time.
pendingReplications.increment(block,
// reconstructions that fail after an appropriate amount of time.
pendingReconstruction.increment(block,
DatanodeStorageInfo.toDatanodeDescriptors(targets));
blockLog.debug("BLOCK* block {} is moved from neededReplications to "
+ "pendingReplications", block);
@ -1907,11 +1909,11 @@ public class BlockManager implements BlockStatsMXBean {
}
/**
* If there were any replication requests that timed out, reap them
* and put them back into the neededReplication queue
* If there were any reconstruction requests that timed out, reap them
* and put them back into the neededReconstruction queue
*/
private void processPendingReplications() {
BlockInfo[] timedOutItems = pendingReplications.getTimedOutBlocks();
private void processPendingReconstructions() {
BlockInfo[] timedOutItems = pendingReconstruction.getTimedOutBlocks();
if (timedOutItems != null) {
namesystem.writeLock();
try {
@ -2890,7 +2892,7 @@ public class BlockManager implements BlockStatsMXBean {
// Now check for completion of blocks and safe block count
NumberReplicas num = countNodes(storedBlock);
int numLiveReplicas = num.liveReplicas();
int pendingNum = pendingReplications.getNumReplicas(storedBlock);
int pendingNum = pendingReconstruction.getNumReplicas(storedBlock);
int numCurrentReplica = numLiveReplicas + pendingNum;
if(storedBlock.getBlockUCState() == BlockUCState.COMMITTED &&
@ -3203,8 +3205,8 @@ public class BlockManager implements BlockStatsMXBean {
/**
* Find how many of the containing nodes are "extra", if any.
* If there are any extras, call chooseExcessReplicates() to
* mark them in the excessReplicateMap.
* If there are any extras, call chooseExcessRedundancies() to
* mark them in the excessRedundancyMap.
*/
private void processExtraRedundancyBlock(final BlockInfo block,
final short replication, final DatanodeDescriptor addedNode,
@ -3237,11 +3239,11 @@ public class BlockManager implements BlockStatsMXBean {
}
}
}
chooseExcessReplicates(nonExcess, block, replication, addedNode,
chooseExcessRedundancies(nonExcess, block, replication, addedNode,
delNodeHint);
}
private void chooseExcessReplicates(
private void chooseExcessRedundancies(
final Collection<DatanodeStorageInfo> nonExcess,
BlockInfo storedBlock, short replication,
DatanodeDescriptor addedNode,
@ -3250,19 +3252,19 @@ public class BlockManager implements BlockStatsMXBean {
// first form a rack to datanodes map and
BlockCollection bc = getBlockCollection(storedBlock);
if (storedBlock.isStriped()) {
chooseExcessReplicasStriped(bc, nonExcess, storedBlock, delNodeHint);
chooseExcessRedundancyStriped(bc, nonExcess, storedBlock, delNodeHint);
} else {
final BlockStoragePolicy storagePolicy = storagePolicySuite.getPolicy(
bc.getStoragePolicyID());
final List<StorageType> excessTypes = storagePolicy.chooseExcess(
replication, DatanodeStorageInfo.toStorageTypes(nonExcess));
chooseExcessReplicasContiguous(nonExcess, storedBlock, replication,
chooseExcessRedundancyContiguous(nonExcess, storedBlock, replication,
addedNode, delNodeHint, excessTypes);
}
}
/**
* We want "replication" replicates for the block, but we now have too many.
* We want sufficient redundancy for the block, but we now have too many.
* In this method, copy enough nodes from 'srcNodes' into 'dstNodes' such that:
*
* srcNodes.size() - dstNodes.size() == replication
@ -3275,7 +3277,7 @@ public class BlockManager implements BlockStatsMXBean {
* If no such a node is available,
* then pick a node with least free space
*/
private void chooseExcessReplicasContiguous(
private void chooseExcessRedundancyContiguous(
final Collection<DatanodeStorageInfo> nonExcess, BlockInfo storedBlock,
short replication, DatanodeDescriptor addedNode,
DatanodeDescriptor delNodeHint, List<StorageType> excessTypes) {
@ -3284,7 +3286,7 @@ public class BlockManager implements BlockStatsMXBean {
.chooseReplicasToDelete(nonExcess, nonExcess, replication, excessTypes,
addedNode, delNodeHint);
for (DatanodeStorageInfo choosenReplica : replicasToDelete) {
processChosenExcessReplica(nonExcess, choosenReplica, storedBlock);
processChosenExcessRedundancy(nonExcess, choosenReplica, storedBlock);
}
}
@ -3297,7 +3299,7 @@ public class BlockManager implements BlockStatsMXBean {
* The block placement policy will make sure that the left internal blocks are
* spread across racks and also try hard to pick one with least free space.
*/
private void chooseExcessReplicasStriped(BlockCollection bc,
private void chooseExcessRedundancyStriped(BlockCollection bc,
final Collection<DatanodeStorageInfo> nonExcess,
BlockInfo storedBlock,
DatanodeDescriptor delNodeHint) {
@ -3325,7 +3327,7 @@ public class BlockManager implements BlockStatsMXBean {
if (delStorageHint != null) {
Integer index = storage2index.get(delStorageHint);
if (index != null && duplicated.get(index)) {
processChosenExcessReplica(nonExcess, delStorageHint, storedBlock);
processChosenExcessRedundancy(nonExcess, delStorageHint, storedBlock);
}
}
@ -3357,7 +3359,7 @@ public class BlockManager implements BlockStatsMXBean {
.chooseReplicasToDelete(nonExcess, candidates, (short) 1,
excessTypes, null, null);
for (DatanodeStorageInfo chosen : replicasToDelete) {
processChosenExcessReplica(nonExcess, chosen, storedBlock);
processChosenExcessRedundancy(nonExcess, chosen, storedBlock);
candidates.remove(chosen);
}
}
@ -3365,11 +3367,11 @@ public class BlockManager implements BlockStatsMXBean {
}
}
private void processChosenExcessReplica(
private void processChosenExcessRedundancy(
final Collection<DatanodeStorageInfo> nonExcess,
final DatanodeStorageInfo chosen, BlockInfo storedBlock) {
nonExcess.remove(chosen);
excessReplicas.add(chosen.getDatanodeDescriptor(), storedBlock);
excessRedundancyMap.add(chosen.getDatanodeDescriptor(), storedBlock);
//
// The 'excessblocks' tracks blocks until we get confirmation
// that the datanode has deleted them; the only way we remove them
@ -3381,7 +3383,7 @@ public class BlockManager implements BlockStatsMXBean {
//
final Block blockToInvalidate = getBlockOnStorage(storedBlock, chosen);
addToInvalidates(blockToInvalidate, chosen.getDatanodeDescriptor());
blockLog.debug("BLOCK* chooseExcessReplicates: "
blockLog.debug("BLOCK* chooseExcessRedundancies: "
+ "({}, {}) is added to invalidated blocks set", chosen, storedBlock);
}
@ -3433,7 +3435,7 @@ public class BlockManager implements BlockStatsMXBean {
updateNeededReconstructions(storedBlock, -1, 0);
}
excessReplicas.remove(node, storedBlock);
excessRedundancyMap.remove(node, storedBlock);
corruptReplicas.removeFromCorruptReplicasMap(storedBlock, node);
}
}
@ -3504,7 +3506,7 @@ public class BlockManager implements BlockStatsMXBean {
//
BlockInfo storedBlock = getStoredBlock(block);
if (storedBlock != null) {
pendingReplications.decrement(storedBlock, node);
pendingReconstruction.decrement(storedBlock, node);
}
processAndHandleReportedBlock(storageInfo, block, ReplicaState.FINALIZED,
delHintNode);
@ -3749,11 +3751,11 @@ public class BlockManager implements BlockStatsMXBean {
@VisibleForTesting
int getExcessSize4Testing(String dnUuid) {
return excessReplicas.getSize4Testing(dnUuid);
return excessRedundancyMap.getSize4Testing(dnUuid);
}
public boolean isExcess(DatanodeDescriptor dn, BlockInfo blk) {
return excessReplicas.contains(dn, blk);
return excessRedundancyMap.contains(dn, blk);
}
/**
@ -3813,7 +3815,7 @@ public class BlockManager implements BlockStatsMXBean {
}
updateState();
if (pendingReplicationBlocksCount == 0 &&
if (pendingReconstructionBlocksCount == 0 &&
lowRedundancyBlocksCount == 0) {
LOG.info("Node {} is dead and there are no low redundancy" +
" blocks or blocks pending reconstruction. Safe to decommission.",
@ -3860,8 +3862,8 @@ public class BlockManager implements BlockStatsMXBean {
block.setNumBytes(BlockCommand.NO_ACK);
addToInvalidates(block);
removeBlockFromMap(block);
// Remove the block from pendingReplications and neededReconstruction
pendingReplications.remove(block);
// Remove the block from pendingReconstruction and neededReconstruction
pendingReconstruction.remove(block);
neededReconstruction.remove(block, LowRedundancyBlocks.LEVEL);
if (postponedMisreplicatedBlocks.remove(block)) {
postponedMisreplicatedBlocksCount.decrementAndGet();
@ -3919,7 +3921,7 @@ public class BlockManager implements BlockStatsMXBean {
for (BlockInfo block : bc.getBlocks()) {
short expected = getExpectedReplicaNum(block);
final NumberReplicas n = countNodes(block);
final int pending = pendingReplications.getNumReplicas(block);
final int pending = pendingReconstruction.getNumReplicas(block);
if (!hasEnoughEffectiveReplicas(block, n, pending, expected)) {
neededReconstruction.add(block, n.liveReplicas() + pending,
n.readOnlyReplicas(),
@ -4059,7 +4061,7 @@ public class BlockManager implements BlockStatsMXBean {
public void removeBlockFromMap(BlockInfo block) {
for(DatanodeStorageInfo info : blocksMap.getStorages(block)) {
excessReplicas.remove(info.getDatanodeDescriptor(), block);
excessRedundancyMap.remove(info.getDatanodeDescriptor(), block);
}
blocksMap.removeBlock(block);
@ -4110,7 +4112,7 @@ public class BlockManager implements BlockStatsMXBean {
// Process replication work only when active NN is out of safe mode.
if (isPopulatingReplQueues()) {
computeDatanodeWork();
processPendingReplications();
processPendingReconstructions();
rescanPostponedMisreplicatedBlocks();
}
Thread.sleep(replicationRecheckInterval);
@ -4258,8 +4260,8 @@ public class BlockManager implements BlockStatsMXBean {
*/
public void clearQueues() {
neededReconstruction.clear();
pendingReplications.clear();
excessReplicas.clear();
pendingReconstruction.clear();
excessRedundancyMap.clear();
invalidateBlocks.clear();
datanodeManager.clearPendingQueues();
postponedMisreplicatedBlocks.clear();

View File

@ -541,7 +541,7 @@ public class DecommissionManager {
// pending
if (blockManager.isNeededReconstruction(block, liveReplicas)) {
if (!blockManager.neededReconstruction.contains(block) &&
blockManager.pendingReplications.getNumReplicas(block) == 0 &&
blockManager.pendingReconstruction.getNumReplicas(block) == 0 &&
blockManager.isPopulatingReplQueues()) {
// Process these blocks only when active NN is out of safe mode.
blockManager.neededReconstruction.add(block,

View File

@ -28,22 +28,26 @@ import org.slf4j.Logger;
import com.google.common.annotations.VisibleForTesting;
/**
* Maps a datnode to the set of excess replicas.
* Maps a datnode to the set of excess redundancy details.
*
* This class is thread safe.
*/
class ExcessReplicaMap {
class ExcessRedundancyMap {
public static final Logger blockLog = NameNode.blockStateChangeLog;
private final Map<String, LightWeightHashSet<BlockInfo>> map =new HashMap<>();
private final AtomicLong size = new AtomicLong(0L);
/** @return the number of replicas in this map. */
/**
* @return the number of redundancies in this map.
*/
long size() {
return size.get();
}
/** @return the number of replicas corresponding to the given datanode. */
/**
* @return the number of redundancies corresponding to the given datanode.
*/
@VisibleForTesting
synchronized int getSize4Testing(String dnUuid) {
final LightWeightHashSet<BlockInfo> set = map.get(dnUuid);
@ -56,7 +60,7 @@ class ExcessReplicaMap {
}
/**
* @return does this map contains a replica corresponding to the given
* @return does this map contains a redundancy corresponding to the given
* datanode and the given block?
*/
synchronized boolean contains(DatanodeDescriptor dn, BlockInfo blk) {
@ -65,7 +69,9 @@ class ExcessReplicaMap {
}
/**
* Add the replica of the given block stored in the given datanode to the map.
* Add the redundancy of the given block stored in the given datanode to the
* map.
*
* @return true if the block is added.
*/
synchronized boolean add(DatanodeDescriptor dn, BlockInfo blk) {
@ -77,13 +83,15 @@ class ExcessReplicaMap {
final boolean added = set.add(blk);
if (added) {
size.incrementAndGet();
blockLog.debug("BLOCK* ExcessReplicaMap.add({}, {})", dn, blk);
blockLog.debug("BLOCK* ExcessRedundancyMap.add({}, {})", dn, blk);
}
return added;
}
/**
* Remove the replica corresponding to the given datanode and the given block.
* Remove the redundancy corresponding to the given datanode and the given
* block.
*
* @return true if the block is removed.
*/
synchronized boolean remove(DatanodeDescriptor dn, BlockInfo blk) {
@ -95,7 +103,7 @@ class ExcessReplicaMap {
final boolean removed = set.remove(blk);
if (removed) {
size.decrementAndGet();
blockLog.debug("BLOCK* ExcessReplicaMap.remove({}, {})", dn, blk);
blockLog.debug("BLOCK* ExcessRedundancyMap.remove({}, {})", dn, blk);
if (set.isEmpty()) {
map.remove(dn.getDatanodeUuid());

View File

@ -33,20 +33,20 @@ import org.apache.hadoop.util.Daemon;
import org.slf4j.Logger;
/***************************************************
* PendingReplicationBlocks does the bookkeeping of all
* blocks that are getting replicated.
* PendingReconstructionBlocks does the bookkeeping of all
* blocks that gains stronger redundancy.
*
* It does the following:
* 1) record blocks that are getting replicated at this instant.
* 2) a coarse grain timer to track age of replication request
* 3) a thread that periodically identifies replication-requests
* 1) record blocks that gains stronger redundancy at this instant.
* 2) a coarse grain timer to track age of reconstruction request
* 3) a thread that periodically identifies reconstruction-requests
* that never made it.
*
***************************************************/
class PendingReplicationBlocks {
class PendingReconstructionBlocks {
private static final Logger LOG = BlockManager.LOG;
private final Map<BlockInfo, PendingBlockInfo> pendingReplications;
private final Map<BlockInfo, PendingBlockInfo> pendingReconstructions;
private final ArrayList<BlockInfo> timedOutItems;
Daemon timerThread = null;
private volatile boolean fsRunning = true;
@ -58,29 +58,29 @@ class PendingReplicationBlocks {
private long timeout = 5 * 60 * 1000;
private final static long DEFAULT_RECHECK_INTERVAL = 5 * 60 * 1000;
PendingReplicationBlocks(long timeoutPeriod) {
PendingReconstructionBlocks(long timeoutPeriod) {
if ( timeoutPeriod > 0 ) {
this.timeout = timeoutPeriod;
}
pendingReplications = new HashMap<>();
pendingReconstructions = new HashMap<>();
timedOutItems = new ArrayList<>();
}
void start() {
timerThread = new Daemon(new PendingReplicationMonitor());
timerThread = new Daemon(new PendingReconstructionMonitor());
timerThread.start();
}
/**
* Add a block to the list of pending Replications
* Add a block to the list of pending reconstructions
* @param block The corresponding block
* @param targets The DataNodes where replicas of the block should be placed
*/
void increment(BlockInfo block, DatanodeDescriptor... targets) {
synchronized (pendingReplications) {
PendingBlockInfo found = pendingReplications.get(block);
synchronized (pendingReconstructions) {
PendingBlockInfo found = pendingReconstructions.get(block);
if (found == null) {
pendingReplications.put(block, new PendingBlockInfo(targets));
pendingReconstructions.put(block, new PendingBlockInfo(targets));
} else {
found.incrementReplicas(targets);
found.setTimeStamp();
@ -89,58 +89,58 @@ class PendingReplicationBlocks {
}
/**
* One replication request for this block has finished.
* Decrement the number of pending replication requests
* One reconstruction request for this block has finished.
* Decrement the number of pending reconstruction requests
* for this block.
*
* @param dn The DataNode that finishes the replication
*
* @param dn The DataNode that finishes the reconstruction
*/
void decrement(BlockInfo block, DatanodeDescriptor dn) {
synchronized (pendingReplications) {
PendingBlockInfo found = pendingReplications.get(block);
synchronized (pendingReconstructions) {
PendingBlockInfo found = pendingReconstructions.get(block);
if (found != null) {
if(LOG.isDebugEnabled()) {
LOG.debug("Removing pending replication for " + block);
}
LOG.debug("Removing pending reconstruction for {}", block);
found.decrementReplicas(dn);
if (found.getNumReplicas() <= 0) {
pendingReplications.remove(block);
pendingReconstructions.remove(block);
}
}
}
}
/**
* Remove the record about the given block from pendingReplications.
* @param block The given block whose pending replication requests need to be
* removed
* Remove the record about the given block from pending reconstructions.
*
* @param block
* The given block whose pending reconstruction requests need to be
* removed
*/
void remove(BlockInfo block) {
synchronized (pendingReplications) {
pendingReplications.remove(block);
synchronized (pendingReconstructions) {
pendingReconstructions.remove(block);
}
}
public void clear() {
synchronized (pendingReplications) {
pendingReplications.clear();
synchronized (pendingReconstructions) {
pendingReconstructions.clear();
timedOutItems.clear();
}
}
/**
* The total number of blocks that are undergoing replication
* The total number of blocks that are undergoing reconstruction.
*/
int size() {
return pendingReplications.size();
}
return pendingReconstructions.size();
}
/**
* How many copies of this block is pending replication?
* How many copies of this block is pending reconstruction?.
*/
int getNumReplicas(BlockInfo block) {
synchronized (pendingReplications) {
PendingBlockInfo found = pendingReplications.get(block);
synchronized (pendingReconstructions) {
PendingBlockInfo found = pendingReconstructions.get(block);
if (found != null) {
return found.getNumReplicas();
}
@ -149,8 +149,8 @@ class PendingReplicationBlocks {
}
/**
* Returns a list of blocks that have timed out their
* replication requests. Returns null if no blocks have
* Returns a list of blocks that have timed out their
* reconstruction requests. Returns null if no blocks have
* timed out.
*/
BlockInfo[] getTimedOutBlocks() {
@ -166,11 +166,11 @@ class PendingReplicationBlocks {
}
/**
* An object that contains information about a block that
* is being replicated. It records the timestamp when the
* system started replicating the most recent copy of this
* block. It also records the list of Datanodes where the
* replication requests are in progress.
* An object that contains information about a block that
* is being reconstructed. It records the timestamp when the
* system started reconstructing the most recent copy of this
* block. It also records the list of Datanodes where the
* reconstruction requests are in progress.
*/
static class PendingBlockInfo {
private long timeStamp;
@ -211,20 +211,18 @@ class PendingReplicationBlocks {
/*
* A periodic thread that scans for blocks that never finished
* their replication request.
* their reconstruction request.
*/
class PendingReplicationMonitor implements Runnable {
class PendingReconstructionMonitor implements Runnable {
@Override
public void run() {
while (fsRunning) {
long period = Math.min(DEFAULT_RECHECK_INTERVAL, timeout);
try {
pendingReplicationCheck();
pendingReconstructionCheck();
Thread.sleep(period);
} catch (InterruptedException ie) {
if(LOG.isDebugEnabled()) {
LOG.debug("PendingReplicationMonitor thread is interrupted.", ie);
}
LOG.debug("PendingReconstructionMonitor thread is interrupted.", ie);
}
}
}
@ -232,14 +230,12 @@ class PendingReplicationBlocks {
/**
* Iterate through all items and detect timed-out items
*/
void pendingReplicationCheck() {
synchronized (pendingReplications) {
void pendingReconstructionCheck() {
synchronized (pendingReconstructions) {
Iterator<Map.Entry<BlockInfo, PendingBlockInfo>> iter =
pendingReplications.entrySet().iterator();
pendingReconstructions.entrySet().iterator();
long now = monotonicNow();
if(LOG.isDebugEnabled()) {
LOG.debug("PendingReplicationMonitor checking Q");
}
LOG.debug("PendingReconstructionMonitor checking Q");
while (iter.hasNext()) {
Map.Entry<BlockInfo, PendingBlockInfo> entry = iter.next();
PendingBlockInfo pendingBlock = entry.getValue();
@ -248,7 +244,7 @@ class PendingReplicationBlocks {
synchronized (timedOutItems) {
timedOutItems.add(block);
}
LOG.warn("PendingReplicationMonitor timed out " + block);
LOG.warn("PendingReconstructionMonitor timed out " + block);
iter.remove();
}
}
@ -257,7 +253,7 @@ class PendingReplicationBlocks {
}
/*
* Shuts down the pending replication monitor thread.
* Shuts down the pending reconstruction monitor thread.
* Waits for the thread to exit.
*/
void stop() {
@ -274,16 +270,16 @@ class PendingReplicationBlocks {
* Iterate through all items and print them.
*/
void metaSave(PrintWriter out) {
synchronized (pendingReplications) {
out.println("Metasave: Blocks being replicated: " +
pendingReplications.size());
synchronized (pendingReconstructions) {
out.println("Metasave: Blocks being reconstructed: " +
pendingReconstructions.size());
for (Map.Entry<BlockInfo, PendingBlockInfo> entry :
pendingReplications.entrySet()) {
pendingReconstructions.entrySet()) {
PendingBlockInfo pendingBlock = entry.getValue();
Block block = entry.getKey();
out.println(block +
" StartTime: " + new Time(pendingBlock.timeStamp) +
" NumReplicaInProgress: " +
" NumReconstructInProgress: " +
pendingBlock.getNumReplicas());
}
}

View File

@ -472,8 +472,8 @@ public class BackupNode extends NameNode {
* {@link BlockManager.ReplicationMonitor} protected by SafeMode.
* {@link HeartbeatManager.Monitor} protected by SafeMode.
* {@link DecommissionManager.Monitor} need to prohibit refreshNodes().
* {@link PendingReplicationBlocks.PendingReplicationMonitor} harmless,
* because ReplicationMonitor is muted.
* {@link PendingReconstructionBlocks.PendingReconstructionMonitor}
* harmless, because ReplicationMonitor is muted.
*/
@Override
public void startActiveServices() throws IOException {

View File

@ -4469,7 +4469,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
@Override // FSNamesystemMBean
@Metric
public long getPendingReplicationBlocks() {
return blockManager.getPendingReplicationBlocksCount();
return blockManager.getPendingReconstructionBlocksCount();
}
@Override // FSNamesystemMBean

View File

@ -3790,10 +3790,10 @@
</property>
<property>
<name>dfs.namenode.replication.pending.timeout-sec</name>
<name>dfs.namenode.reconstruction.pending.timeout-sec</name>
<value>-1</value>
<description>
Timeout in seconds for block replication. If this value is 0 or less,
Timeout in seconds for block reconstruction. If this value is 0 or less,
then it will default to 5 minutes.
</description>
</property>

View File

@ -85,7 +85,7 @@ public class TestAppendSnapshotTruncate {
conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, BLOCK_SIZE);
conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, SHORT_HEARTBEAT);
conf.setLong(
DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY, 1);
DFSConfigKeys.DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY, 1);
conf.setBoolean(ReplaceDatanodeOnFailure.BEST_EFFORT_KEY, true);
cluster = new MiniDFSCluster.Builder(conf)
.format(true)

View File

@ -286,7 +286,7 @@ public class TestDatanodeDeath {
Configuration conf = new HdfsConfiguration();
conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 2000);
conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 2);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY, 2);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY, 2);
conf.setInt(HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, 5000);
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(numDatanodes).build();
@ -342,7 +342,7 @@ public class TestDatanodeDeath {
Configuration conf = new HdfsConfiguration();
conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 2000);
conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY, 2);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY, 2);
conf.setInt(HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, 5000);
int myMaxNodes = 5;
System.out.println("SimpleTest starting with DataNode to Kill " +

View File

@ -114,7 +114,7 @@ public class TestDecommission {
conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, HEARTBEAT_INTERVAL);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1);
conf.setInt(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, BLOCKREPORT_INTERVAL_MSEC);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY, 4);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY, 4);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, NAMENODE_REPLICATION_INTERVAL);
writeConfigFile(hostsFile, null);

View File

@ -124,7 +124,7 @@ public class TestDecommissionWithStriped {
conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1);
conf.setInt(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY,
BLOCKREPORT_INTERVAL_MSEC);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY,
conf.setInt(DFSConfigKeys.DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY,
4);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY,
NAMENODE_REPLICATION_INTERVAL);

View File

@ -481,7 +481,7 @@ public class TestFileAppend2 {
Configuration conf = new HdfsConfiguration();
conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 2000);
conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 2);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY, 2);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY, 2);
conf.setInt(HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, 30000);
conf.setInt(HdfsClientConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY, 30000);
conf.setInt(DFSConfigKeys.DFS_DATANODE_HANDLER_COUNT_KEY, 50);

View File

@ -84,7 +84,7 @@ public class TestFileAppend4 {
conf.setInt(HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, 5000);
// handle under-replicated blocks quickly (for replication asserts)
conf.setInt(
DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY, 5);
DFSConfigKeys.DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY, 5);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1);
// handle failures in the DFSClient pipeline quickly

View File

@ -410,7 +410,7 @@ public class TestReplication {
LOG.info("Restarting minicluster after deleting a replica and corrupting 2 crcs");
conf = new HdfsConfiguration();
conf.set(DFSConfigKeys.DFS_REPLICATION_KEY, Integer.toString(numDataNodes));
conf.set(DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY, Integer.toString(2));
conf.set(DFSConfigKeys.DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY, Integer.toString(2));
conf.set("dfs.datanode.block.write.timeout.sec", Integer.toString(5));
conf.set(DFSConfigKeys.DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_KEY, "0.75f"); // only 3 copies exist
@ -507,7 +507,7 @@ public class TestReplication {
try {
Configuration conf = new HdfsConfiguration();
conf.setLong(
DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY, 1);
DFSConfigKeys.DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY, 1);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3)
.storagesPerDatanode(1).build();
FileSystem fs = cluster.getFileSystem();
@ -687,7 +687,7 @@ public class TestReplication {
private long pendingReplicationCount(BlockManager bm) {
BlockManagerTestUtil.updateState(bm);
return bm.getPendingReplicationBlocksCount();
return bm.getPendingReconstructionBlocksCount();
}
private void assertNoReplicationWasPerformed(MiniDFSCluster cluster) {

View File

@ -39,7 +39,7 @@ public class TestSetrepIncreasing {
}
conf.set(DFSConfigKeys.DFS_REPLICATION_KEY, "" + fromREP);
conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 1000L);
conf.set(DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY, Integer.toString(2));
conf.set(DFSConfigKeys.DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY, Integer.toString(2));
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(10).build();
FileSystem fs = cluster.getFileSystem();
assertTrue("Not a HDFS: "+fs.getUri(), fs instanceof DistributedFileSystem);

View File

@ -537,15 +537,16 @@ public class TestBlockManager {
list_all.add(new ArrayList<BlockInfo>()); // for priority 0
list_all.add(list_p1); // for priority 1
assertEquals("Block not initially pending replication", 0,
bm.pendingReplications.getNumReplicas(block));
assertEquals("Block not initially pending reconstruction", 0,
bm.pendingReconstruction.getNumReplicas(block));
assertEquals(
"computeBlockReconstructionWork should indicate replication is needed",
"computeBlockReconstructionWork should indicate reconstruction is needed",
1, bm.computeReconstructionWorkForBlocks(list_all));
assertTrue("replication is pending after work is computed",
bm.pendingReplications.getNumReplicas(block) > 0);
assertTrue("reconstruction is pending after work is computed",
bm.pendingReconstruction.getNumReplicas(block) > 0);
LinkedListMultimap<DatanodeStorageInfo, BlockTargetPair> repls = getAllPendingReplications();
LinkedListMultimap<DatanodeStorageInfo, BlockTargetPair> repls =
getAllPendingReconstruction();
assertEquals(1, repls.size());
Entry<DatanodeStorageInfo, BlockTargetPair> repl =
repls.entries().iterator().next();
@ -559,7 +560,7 @@ public class TestBlockManager {
return pipeline;
}
private LinkedListMultimap<DatanodeStorageInfo, BlockTargetPair> getAllPendingReplications() {
private LinkedListMultimap<DatanodeStorageInfo, BlockTargetPair> getAllPendingReconstruction() {
LinkedListMultimap<DatanodeStorageInfo, BlockTargetPair> repls =
LinkedListMultimap.create();
for (DatanodeDescriptor dn : nodes) {
@ -574,8 +575,8 @@ public class TestBlockManager {
}
/**
* Test that a source node for a highest-priority replication is chosen even if all available
* source nodes have reached their replication limits.
* Test that a source node for a highest-priority reconstruction is chosen
* even if all available source nodes have reached their replication limits.
*/
@Test
public void testHighestPriReplSrcChosenDespiteMaxReplLimit() throws Exception {

View File

@ -68,7 +68,7 @@ public class TestBlocksWithNotEnoughRacks {
// Have the NN check for pending replications every second so it
// quickly schedules additional replicas as they are identified.
conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY, 1);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY, 1);
// The DNs report blocks every second.
conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 1000L);

View File

@ -54,7 +54,7 @@ public class TestOverReplicatedBlocks {
conf.setLong(DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, 100L);
conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 1000L);
conf.set(
DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY,
DFSConfigKeys.DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY,
Integer.toString(2));
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
FileSystem fs = cluster.getFileSystem();

View File

@ -45,10 +45,10 @@ import org.junit.Test;
import org.mockito.Mockito;
/**
* This class tests the internals of PendingReplicationBlocks.java,
* as well as how PendingReplicationBlocks acts in BlockManager
* This class tests the internals of PendingReconstructionBlocks.java, as well
* as how PendingReconstructionBlocks acts in BlockManager
*/
public class TestPendingReplication {
public class TestPendingReconstruction {
final static int TIMEOUT = 3; // 3 seconds
private static final int DFS_REPLICATION_INTERVAL = 1;
// Number of datanodes in the cluster
@ -60,49 +60,49 @@ public class TestPendingReplication {
}
@Test
public void testPendingReplication() {
PendingReplicationBlocks pendingReplications;
pendingReplications = new PendingReplicationBlocks(TIMEOUT * 1000);
pendingReplications.start();
public void testPendingReconstruction() {
PendingReconstructionBlocks pendingReconstructions;
pendingReconstructions = new PendingReconstructionBlocks(TIMEOUT * 1000);
pendingReconstructions.start();
//
// Add 10 blocks to pendingReplications.
// Add 10 blocks to pendingReconstruction.
//
DatanodeStorageInfo[] storages = DFSTestUtil.createDatanodeStorageInfos(10);
for (int i = 0; i < storages.length; i++) {
BlockInfo block = genBlockInfo(i, i, 0);
DatanodeStorageInfo[] targets = new DatanodeStorageInfo[i];
System.arraycopy(storages, 0, targets, 0, i);
pendingReplications.increment(block,
pendingReconstructions.increment(block,
DatanodeStorageInfo.toDatanodeDescriptors(targets));
}
assertEquals("Size of pendingReplications ",
10, pendingReplications.size());
assertEquals("Size of pendingReconstruction ",
10, pendingReconstructions.size());
//
// remove one item
//
BlockInfo blk = genBlockInfo(8, 8, 0);
pendingReplications.decrement(blk, storages[7].getDatanodeDescriptor()); // removes one replica
assertEquals("pendingReplications.getNumReplicas ",
7, pendingReplications.getNumReplicas(blk));
pendingReconstructions.decrement(blk, storages[7].getDatanodeDescriptor()); // removes one replica
assertEquals("pendingReconstructions.getNumReplicas ",
7, pendingReconstructions.getNumReplicas(blk));
//
// insert the same item twice should be counted as once
//
pendingReplications.increment(blk, storages[0].getDatanodeDescriptor());
assertEquals("pendingReplications.getNumReplicas ",
7, pendingReplications.getNumReplicas(blk));
pendingReconstructions.increment(blk, storages[0].getDatanodeDescriptor());
assertEquals("pendingReconstructions.getNumReplicas ",
7, pendingReconstructions.getNumReplicas(blk));
for (int i = 0; i < 7; i++) {
// removes all replicas
pendingReplications.decrement(blk, storages[i].getDatanodeDescriptor());
pendingReconstructions.decrement(blk, storages[i].getDatanodeDescriptor());
}
assertTrue(pendingReplications.size() == 9);
pendingReplications.increment(blk,
assertTrue(pendingReconstructions.size() == 9);
pendingReconstructions.increment(blk,
DatanodeStorageInfo.toDatanodeDescriptors(
DFSTestUtil.createDatanodeStorageInfos(8)));
assertTrue(pendingReplications.size() == 10);
assertTrue(pendingReconstructions.size() == 10);
//
// verify that the number of replicas returned
@ -110,14 +110,14 @@ public class TestPendingReplication {
//
for (int i = 0; i < 10; i++) {
BlockInfo block = genBlockInfo(i, i, 0);
int numReplicas = pendingReplications.getNumReplicas(block);
int numReplicas = pendingReconstructions.getNumReplicas(block);
assertTrue(numReplicas == i);
}
//
// verify that nothing has timed out so far
//
assertTrue(pendingReplications.getTimedOutBlocks() == null);
assertTrue(pendingReconstructions.getTimedOutBlocks() == null);
//
// Wait for one second and then insert some more items.
@ -129,17 +129,17 @@ public class TestPendingReplication {
for (int i = 10; i < 15; i++) {
BlockInfo block = genBlockInfo(i, i, 0);
pendingReplications.increment(block,
pendingReconstructions.increment(block,
DatanodeStorageInfo.toDatanodeDescriptors(
DFSTestUtil.createDatanodeStorageInfos(i)));
}
assertTrue(pendingReplications.size() == 15);
assertTrue(pendingReconstructions.size() == 15);
//
// Wait for everything to timeout.
//
int loop = 0;
while (pendingReplications.size() > 0) {
while (pendingReconstructions.size() > 0) {
try {
Thread.sleep(1000);
} catch (Exception e) {
@ -152,24 +152,24 @@ public class TestPendingReplication {
//
// Verify that everything has timed out.
//
assertEquals("Size of pendingReplications ", 0, pendingReplications.size());
Block[] timedOut = pendingReplications.getTimedOutBlocks();
assertEquals("Size of pendingReconstructions ", 0, pendingReconstructions.size());
Block[] timedOut = pendingReconstructions.getTimedOutBlocks();
assertTrue(timedOut != null && timedOut.length == 15);
for (int i = 0; i < timedOut.length; i++) {
assertTrue(timedOut[i].getBlockId() < 15);
}
pendingReplications.stop();
pendingReconstructions.stop();
}
/* Test that processPendingReplications will use the most recent
/* Test that processpendingReconstructions will use the most recent
* blockinfo from the blocksmap by placing a larger genstamp into
* the blocksmap.
*/
@Test
public void testProcessPendingReplications() throws Exception {
public void testProcessPendingReconstructions() throws Exception {
final Configuration conf = new HdfsConfiguration();
conf.setLong(
DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY, TIMEOUT);
DFSConfigKeys.DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY, TIMEOUT);
MiniDFSCluster cluster = null;
Block block;
BlockInfo blockInfo;
@ -181,19 +181,19 @@ public class TestPendingReplication {
FSNamesystem fsn = cluster.getNamesystem();
BlockManager blkManager = fsn.getBlockManager();
PendingReplicationBlocks pendingReplications =
blkManager.pendingReplications;
PendingReconstructionBlocks pendingReconstruction =
blkManager.pendingReconstruction;
LowRedundancyBlocks neededReconstruction = blkManager.neededReconstruction;
BlocksMap blocksMap = blkManager.blocksMap;
//
// Add 1 block to pendingReplications with GenerationStamp = 0.
// Add 1 block to pendingReconstructions with GenerationStamp = 0.
//
block = new Block(1, 1, 0);
blockInfo = new BlockInfoContiguous(block, (short) 3);
pendingReplications.increment(blockInfo,
pendingReconstruction.increment(blockInfo,
DatanodeStorageInfo.toDatanodeDescriptors(
DFSTestUtil.createDatanodeStorageInfos(1)));
BlockCollection bc = Mockito.mock(BlockCollection.class);
@ -201,25 +201,25 @@ public class TestPendingReplication {
blockInfo.setGenerationStamp(1);
blocksMap.addBlockCollection(blockInfo, bc);
assertEquals("Size of pendingReplications ", 1,
pendingReplications.size());
assertEquals("Size of pendingReconstructions ", 1,
pendingReconstruction.size());
// Add a second block to pendingReplications that has no
// Add a second block to pendingReconstructions that has no
// corresponding entry in blocksmap
block = new Block(2, 2, 0);
blockInfo = new BlockInfoContiguous(block, (short) 3);
pendingReplications.increment(blockInfo,
pendingReconstruction.increment(blockInfo,
DatanodeStorageInfo.toDatanodeDescriptors(
DFSTestUtil.createDatanodeStorageInfos(1)));
// verify 2 blocks in pendingReplications
assertEquals("Size of pendingReplications ", 2,
pendingReplications.size());
// verify 2 blocks in pendingReconstructions
assertEquals("Size of pendingReconstructions ", 2,
pendingReconstruction.size());
//
// Wait for everything to timeout.
//
while (pendingReplications.size() > 0) {
while (pendingReconstruction.size() > 0) {
try {
Thread.sleep(100);
} catch (Exception e) {
@ -252,11 +252,11 @@ public class TestPendingReplication {
}
}
}
/**
* Test if DatanodeProtocol#blockReceivedAndDeleted can correctly update the
* pending replications. Also make sure the blockReceivedAndDeleted call is
* idempotent to the pending replications.
* pending reconstruction. Also make sure the blockReceivedAndDeleted call is
* idempotent to the pending reconstruction.
*/
@Test
public void testBlockReceived() throws Exception {
@ -271,7 +271,7 @@ public class TestPendingReplication {
DistributedFileSystem hdfs = cluster.getFileSystem();
FSNamesystem fsn = cluster.getNamesystem();
BlockManager blkManager = fsn.getBlockManager();
final String file = "/tmp.txt";
final Path filePath = new Path(file);
short replFactor = 1;
@ -286,11 +286,11 @@ public class TestPendingReplication {
hdfs.setReplication(filePath, (short) DATANODE_COUNT);
BlockManagerTestUtil.computeAllPendingWork(blkManager);
assertEquals(1, blkManager.pendingReplications.size());
assertEquals(1, blkManager.pendingReconstruction.size());
INodeFile fileNode = fsn.getFSDirectory().getINode4Write(file).asFile();
BlockInfo[] blocks = fileNode.getBlocks();
assertEquals(DATANODE_COUNT - 1,
blkManager.pendingReplications.getNumReplicas(blocks[0]));
blkManager.pendingReconstruction.getNumReplicas(blocks[0]));
LocatedBlock locatedBlock = hdfs.getClient().getLocatedBlocks(file, 0)
.get(0);
@ -303,7 +303,7 @@ public class TestPendingReplication {
if (!datanodes.get(i).getDatanodeId().equals(existingDn)) {
DatanodeRegistration dnR = datanodes.get(i).getDNRegistrationForBP(
poolId);
StorageReceivedDeletedBlocks[] report = {
StorageReceivedDeletedBlocks[] report = {
new StorageReceivedDeletedBlocks("Fake-storage-ID-Ignored",
new ReceivedDeletedBlockInfo[] { new ReceivedDeletedBlockInfo(
blocks[0], BlockStatus.RECEIVED_BLOCK, "") }) };
@ -314,14 +314,14 @@ public class TestPendingReplication {
// IBRs are async, make sure the NN processes all of them.
cluster.getNamesystem().getBlockManager().flushBlockOps();
assertEquals(DATANODE_COUNT - 3,
blkManager.pendingReplications.getNumReplicas(blocks[0]));
blkManager.pendingReconstruction.getNumReplicas(blocks[0]));
// let the same datanodes report again
for (int i = 0; i < DATANODE_COUNT && reportDnNum < 2; i++) {
if (!datanodes.get(i).getDatanodeId().equals(existingDn)) {
DatanodeRegistration dnR = datanodes.get(i).getDNRegistrationForBP(
poolId);
StorageReceivedDeletedBlocks[] report =
StorageReceivedDeletedBlocks[] report =
{ new StorageReceivedDeletedBlocks("Fake-storage-ID-Ignored",
new ReceivedDeletedBlockInfo[] { new ReceivedDeletedBlockInfo(
blocks[0], BlockStatus.RECEIVED_BLOCK, "") }) };
@ -332,7 +332,7 @@ public class TestPendingReplication {
cluster.getNamesystem().getBlockManager().flushBlockOps();
assertEquals(DATANODE_COUNT - 3,
blkManager.pendingReplications.getNumReplicas(blocks[0]));
blkManager.pendingReconstruction.getNumReplicas(blocks[0]));
// re-enable heartbeat for the datanode that has data
for (int i = 0; i < DATANODE_COUNT; i++) {
@ -342,18 +342,18 @@ public class TestPendingReplication {
}
Thread.sleep(5000);
assertEquals(0, blkManager.pendingReplications.size());
assertEquals(0, blkManager.pendingReconstruction.size());
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
/**
* Test if BlockManager can correctly remove corresponding pending records
* when a file is deleted
*
*
* @throws Exception
*/
@Test
@ -362,12 +362,12 @@ public class TestPendingReplication {
CONF.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 1024);
CONF.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
DFS_REPLICATION_INTERVAL);
CONF.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY,
CONF.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY,
DFS_REPLICATION_INTERVAL);
MiniDFSCluster cluster = new MiniDFSCluster.Builder(CONF).numDataNodes(
DATANODE_COUNT).build();
cluster.waitActive();
FSNamesystem namesystem = cluster.getNamesystem();
BlockManager bm = namesystem.getBlockManager();
DistributedFileSystem fs = cluster.getFileSystem();
@ -375,12 +375,12 @@ public class TestPendingReplication {
// 1. create a file
Path filePath = new Path("/tmp.txt");
DFSTestUtil.createFile(fs, filePath, 1024, (short) 3, 0L);
// 2. disable the heartbeats
for (DataNode dn : cluster.getDataNodes()) {
DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, true);
}
// 3. mark a couple of blocks as corrupt
LocatedBlock block = NameNodeAdapter.getBlockLocations(
cluster.getNameNode(), filePath.toString(), 0, 1).get(0);
@ -395,20 +395,20 @@ public class TestPendingReplication {
}
BlockManagerTestUtil.computeAllPendingWork(bm);
BlockManagerTestUtil.updateState(bm);
assertEquals(bm.getPendingReplicationBlocksCount(), 1L);
assertEquals(bm.getPendingReconstructionBlocksCount(), 1L);
BlockInfo storedBlock = bm.getStoredBlock(block.getBlock().getLocalBlock());
assertEquals(bm.pendingReplications.getNumReplicas(storedBlock), 2);
assertEquals(bm.pendingReconstruction.getNumReplicas(storedBlock), 2);
// 4. delete the file
fs.delete(filePath, true);
// retry at most 10 times, each time sleep for 1s. Note that 10s is much
// less than the default pending record timeout (5~10min)
int retries = 10;
long pendingNum = bm.getPendingReplicationBlocksCount();
int retries = 10;
long pendingNum = bm.getPendingReconstructionBlocksCount();
while (pendingNum != 0 && retries-- > 0) {
Thread.sleep(1000); // let NN do the deletion
BlockManagerTestUtil.updateState(bm);
pendingNum = bm.getPendingReplicationBlocksCount();
pendingNum = bm.getPendingReconstructionBlocksCount();
}
assertEquals(pendingNum, 0L);
} finally {

View File

@ -96,8 +96,8 @@ public class TestDecommissioningStatus {
conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
1000);
conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY,
4);
conf.setInt(
DFSConfigKeys.DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY, 4);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1000);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY, 1);
conf.setLong(DFSConfigKeys.DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_KEY, 1);

View File

@ -98,7 +98,7 @@ public class TestFileTruncate {
conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, BLOCK_SIZE);
conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, SHORT_HEARTBEAT);
conf.setLong(
DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY, 1);
DFSConfigKeys.DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY, 1);
cluster = new MiniDFSCluster.Builder(conf)
.format(true)
.numDataNodes(DATANODE_NUM)

View File

@ -82,7 +82,7 @@ public class TestHostsFiles {
// Have the NN check for pending replications every second so it
// quickly schedules additional replicas as they are identified.
conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY, 1);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY, 1);
// The DNs report blocks every second.
conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 1000L);

View File

@ -156,7 +156,7 @@ public class TestMetaSave {
line = reader.readLine();
assertTrue(line.equals("Mis-replicated blocks that have been postponed:"));
line = reader.readLine();
assertTrue(line.equals("Metasave: Blocks being replicated: 0"));
assertTrue(line.equals("Metasave: Blocks being reconstructed: 0"));
line = reader.readLine();
assertTrue(line.equals("Metasave: Blocks 2 waiting deletion from 1 datanodes."));
//skip 2 lines to reach HDFS-9033 scenario.

View File

@ -58,7 +58,7 @@ public class TestProcessCorruptBlocks {
public void testWhenDecreasingReplication() throws Exception {
Configuration conf = new HdfsConfiguration();
conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 1000L);
conf.set(DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY, Integer.toString(2));
conf.set(DFSConfigKeys.DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY, Integer.toString(2));
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
FileSystem fs = cluster.getFileSystem();
final FSNamesystem namesystem = cluster.getNamesystem();
@ -113,7 +113,7 @@ public class TestProcessCorruptBlocks {
public void testByAddingAnExtraDataNode() throws Exception {
Configuration conf = new HdfsConfiguration();
conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 1000L);
conf.set(DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY, Integer.toString(2));
conf.set(DFSConfigKeys.DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY, Integer.toString(2));
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(4).build();
FileSystem fs = cluster.getFileSystem();
final FSNamesystem namesystem = cluster.getNamesystem();
@ -164,7 +164,7 @@ public class TestProcessCorruptBlocks {
public void testWithReplicationFactorAsOne() throws Exception {
Configuration conf = new HdfsConfiguration();
conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 1000L);
conf.set(DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY, Integer.toString(2));
conf.set(DFSConfigKeys.DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY, Integer.toString(2));
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
FileSystem fs = cluster.getFileSystem();
final FSNamesystem namesystem = cluster.getNamesystem();
@ -218,7 +218,7 @@ public class TestProcessCorruptBlocks {
public void testWithAllCorruptReplicas() throws Exception {
Configuration conf = new HdfsConfiguration();
conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 1000L);
conf.set(DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY, Integer.toString(2));
conf.set(DFSConfigKeys.DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY, Integer.toString(2));
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
FileSystem fs = cluster.getFileSystem();
final FSNamesystem namesystem = cluster.getNamesystem();

View File

@ -209,7 +209,7 @@ public class TestReconstructStripedBlocks {
cellSize, NUM_DATA_BLOCKS, NUM_PARITY_BLOCKS);
assertEquals(0, getNumberOfBlocksToBeErasureCoded(cluster));
assertEquals(0, bm.getPendingReplicationBlocksCount());
assertEquals(0, bm.getPendingReconstructionBlocksCount());
// missing 1 block, so 1 task should be scheduled
DatanodeInfo dn0 = lbs[0].getLocations()[0];
@ -217,7 +217,7 @@ public class TestReconstructStripedBlocks {
cluster.setDataNodeDead(dn0);
BlockManagerTestUtil.getComputedDatanodeWork(bm);
assertEquals(1, getNumberOfBlocksToBeErasureCoded(cluster));
assertEquals(1, bm.getPendingReplicationBlocksCount());
assertEquals(1, bm.getPendingReconstructionBlocksCount());
// missing another block, but no new task should be scheduled because
// previous task isn't finished.
@ -226,7 +226,7 @@ public class TestReconstructStripedBlocks {
cluster.setDataNodeDead(dn1);
BlockManagerTestUtil.getComputedDatanodeWork(bm);
assertEquals(1, getNumberOfBlocksToBeErasureCoded(cluster));
assertEquals(1, bm.getPendingReplicationBlocksCount());
assertEquals(1, bm.getPendingReconstructionBlocksCount());
} finally {
cluster.shutdown();
}

View File

@ -296,7 +296,7 @@ public class TestDNFencing {
LOG.info("Getting more replication work computed");
}
BlockManager bm1 = nn1.getNamesystem().getBlockManager();
while (bm1.getPendingReplicationBlocksCount() > 0) {
while (bm1.getPendingReconstructionBlocksCount() > 0) {
BlockManagerTestUtil.updateState(bm1);
cluster.triggerHeartbeats();
Thread.sleep(1000);