HDFS-7912. Erasure Coding: track BlockInfo instead of Block in UnderReplicatedBlocks and PendingReplicationBlocks. Contributed by Jing Zhao.
This commit is contained in:
parent
26773d9d6c
commit
a38a37c634
@ -1346,7 +1346,7 @@ int computeInvalidateWork(int nodesToProcess) {
|
|||||||
* @return number of blocks scheduled for replication during this iteration.
|
* @return number of blocks scheduled for replication during this iteration.
|
||||||
*/
|
*/
|
||||||
int computeReplicationWork(int blocksToProcess) {
|
int computeReplicationWork(int blocksToProcess) {
|
||||||
List<List<Block>> blocksToReplicate = null;
|
List<List<BlockInfo>> blocksToReplicate = null;
|
||||||
namesystem.writeLock();
|
namesystem.writeLock();
|
||||||
try {
|
try {
|
||||||
// Choose the blocks to be replicated
|
// Choose the blocks to be replicated
|
||||||
@ -1364,7 +1364,7 @@ int computeReplicationWork(int blocksToProcess) {
|
|||||||
* @return the number of blocks scheduled for replication
|
* @return the number of blocks scheduled for replication
|
||||||
*/
|
*/
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
int computeReplicationWorkForBlocks(List<List<Block>> blocksToReplicate) {
|
int computeReplicationWorkForBlocks(List<List<BlockInfo>> blocksToReplicate) {
|
||||||
int requiredReplication, numEffectiveReplicas;
|
int requiredReplication, numEffectiveReplicas;
|
||||||
List<DatanodeDescriptor> containingNodes;
|
List<DatanodeDescriptor> containingNodes;
|
||||||
DatanodeDescriptor srcNode;
|
DatanodeDescriptor srcNode;
|
||||||
@ -1378,7 +1378,7 @@ int computeReplicationWorkForBlocks(List<List<Block>> blocksToReplicate) {
|
|||||||
try {
|
try {
|
||||||
synchronized (neededReplications) {
|
synchronized (neededReplications) {
|
||||||
for (int priority = 0; priority < blocksToReplicate.size(); priority++) {
|
for (int priority = 0; priority < blocksToReplicate.size(); priority++) {
|
||||||
for (Block block : blocksToReplicate.get(priority)) {
|
for (BlockInfo block : blocksToReplicate.get(priority)) {
|
||||||
// block should belong to a file
|
// block should belong to a file
|
||||||
bc = blocksMap.getBlockCollection(block);
|
bc = blocksMap.getBlockCollection(block);
|
||||||
// abandoned block or block reopened for append
|
// abandoned block or block reopened for append
|
||||||
@ -1462,7 +1462,7 @@ int computeReplicationWorkForBlocks(List<List<Block>> blocksToReplicate) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
synchronized (neededReplications) {
|
synchronized (neededReplications) {
|
||||||
Block block = rw.block;
|
BlockInfo block = rw.block;
|
||||||
int priority = rw.priority;
|
int priority = rw.priority;
|
||||||
// Recheck since global lock was released
|
// Recheck since global lock was released
|
||||||
// block should belong to a file
|
// block should belong to a file
|
||||||
@ -1724,7 +1724,7 @@ else if (node.isDecommissionInProgress()) {
|
|||||||
* and put them back into the neededReplication queue
|
* and put them back into the neededReplication queue
|
||||||
*/
|
*/
|
||||||
private void processPendingReplications() {
|
private void processPendingReplications() {
|
||||||
Block[] timedOutItems = pendingReplications.getTimedOutBlocks();
|
BlockInfo[] timedOutItems = pendingReplications.getTimedOutBlocks();
|
||||||
if (timedOutItems != null) {
|
if (timedOutItems != null) {
|
||||||
namesystem.writeLock();
|
namesystem.writeLock();
|
||||||
try {
|
try {
|
||||||
@ -2917,13 +2917,13 @@ private MisReplicationResult processMisReplicatedBlock(BlockInfo block) {
|
|||||||
|
|
||||||
/** Set replication for the blocks. */
|
/** Set replication for the blocks. */
|
||||||
public void setReplication(final short oldRepl, final short newRepl,
|
public void setReplication(final short oldRepl, final short newRepl,
|
||||||
final String src, final Block... blocks) {
|
final String src, final BlockInfoContiguous... blocks) {
|
||||||
if (newRepl == oldRepl) {
|
if (newRepl == oldRepl) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// update needReplication priority queues
|
// update needReplication priority queues
|
||||||
for(Block b : blocks) {
|
for(BlockInfoContiguous b : blocks) {
|
||||||
updateNeededReplications(b, 0, newRepl-oldRepl);
|
updateNeededReplications(b, 0, newRepl-oldRepl);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -2931,7 +2931,7 @@ public void setReplication(final short oldRepl, final short newRepl,
|
|||||||
// old replication > the new one; need to remove copies
|
// old replication > the new one; need to remove copies
|
||||||
LOG.info("Decreasing replication from " + oldRepl + " to " + newRepl
|
LOG.info("Decreasing replication from " + oldRepl + " to " + newRepl
|
||||||
+ " for " + src);
|
+ " for " + src);
|
||||||
for(Block b : blocks) {
|
for(BlockInfoContiguous b : blocks) {
|
||||||
processOverReplicatedBlock(b, newRepl, null, null);
|
processOverReplicatedBlock(b, newRepl, null, null);
|
||||||
}
|
}
|
||||||
} else { // replication factor is increased
|
} else { // replication factor is increased
|
||||||
@ -3114,7 +3114,8 @@ public void removeStoredBlock(Block block, DatanodeDescriptor node) {
|
|||||||
blockLog.debug("BLOCK* removeStoredBlock: {} from {}", block, node);
|
blockLog.debug("BLOCK* removeStoredBlock: {} from {}", block, node);
|
||||||
assert (namesystem.hasWriteLock());
|
assert (namesystem.hasWriteLock());
|
||||||
{
|
{
|
||||||
if (!blocksMap.removeNode(block, node)) {
|
BlockInfo storedBlock = getStoredBlock(block);
|
||||||
|
if (storedBlock == null || !blocksMap.removeNode(storedBlock, node)) {
|
||||||
blockLog.debug("BLOCK* removeStoredBlock: {} has already been" +
|
blockLog.debug("BLOCK* removeStoredBlock: {} has already been" +
|
||||||
" removed from node {}", block, node);
|
" removed from node {}", block, node);
|
||||||
return;
|
return;
|
||||||
@ -3128,8 +3129,8 @@ public void removeStoredBlock(Block block, DatanodeDescriptor node) {
|
|||||||
//
|
//
|
||||||
BlockCollection bc = blocksMap.getBlockCollection(block);
|
BlockCollection bc = blocksMap.getBlockCollection(block);
|
||||||
if (bc != null) {
|
if (bc != null) {
|
||||||
namesystem.decrementSafeBlockCount(block);
|
namesystem.decrementSafeBlockCount(storedBlock);
|
||||||
updateNeededReplications(block, -1, 0);
|
updateNeededReplications(storedBlock, -1, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
//
|
//
|
||||||
@ -3203,7 +3204,7 @@ void addBlock(DatanodeStorageInfo storageInfo, Block block, String delHint)
|
|||||||
//
|
//
|
||||||
// Modify the blocks->datanode map and node's map.
|
// Modify the blocks->datanode map and node's map.
|
||||||
//
|
//
|
||||||
pendingReplications.decrement(block, node);
|
pendingReplications.decrement(getStoredBlock(block), node);
|
||||||
processAndHandleReportedBlock(storageInfo, block, ReplicaState.FINALIZED,
|
processAndHandleReportedBlock(storageInfo, block, ReplicaState.FINALIZED,
|
||||||
delHintNode);
|
delHintNode);
|
||||||
}
|
}
|
||||||
@ -3318,7 +3319,7 @@ public void processIncrementalBlockReport(final DatanodeID nodeID,
|
|||||||
* For a striped block, this includes nodes storing blocks belonging to the
|
* For a striped block, this includes nodes storing blocks belonging to the
|
||||||
* striped block group.
|
* striped block group.
|
||||||
*/
|
*/
|
||||||
public NumberReplicas countNodes(Block b) {
|
public NumberReplicas countNodes(BlockInfo b) {
|
||||||
int decommissioned = 0;
|
int decommissioned = 0;
|
||||||
int decommissioning = 0;
|
int decommissioning = 0;
|
||||||
int live = 0;
|
int live = 0;
|
||||||
@ -3351,11 +3352,11 @@ public NumberReplicas countNodes(Block b) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Simpler, faster form of {@link #countNodes(Block)} that only returns the number
|
* Simpler, faster form of {@link #countNodes} that only returns the number
|
||||||
* of live nodes. If in startup safemode (or its 30-sec extension period),
|
* of live nodes. If in startup safemode (or its 30-sec extension period),
|
||||||
* then it gains speed by ignoring issues of excess replicas or nodes
|
* then it gains speed by ignoring issues of excess replicas or nodes
|
||||||
* that are decommissioned or in process of becoming decommissioned.
|
* that are decommissioned or in process of becoming decommissioned.
|
||||||
* If not in startup, then it calls {@link #countNodes(Block)} instead.
|
* If not in startup, then it calls {@link #countNodes} instead.
|
||||||
*
|
*
|
||||||
* @param b - the block being tested
|
* @param b - the block being tested
|
||||||
* @return count of live nodes for this block
|
* @return count of live nodes for this block
|
||||||
@ -3385,10 +3386,10 @@ void processOverReplicatedBlocksOnReCommission(
|
|||||||
if (!namesystem.isPopulatingReplQueues()) {
|
if (!namesystem.isPopulatingReplQueues()) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
final Iterator<? extends Block> it = srcNode.getBlockIterator();
|
final Iterator<BlockInfo> it = srcNode.getBlockIterator();
|
||||||
int numOverReplicated = 0;
|
int numOverReplicated = 0;
|
||||||
while(it.hasNext()) {
|
while(it.hasNext()) {
|
||||||
final Block block = it.next();
|
final BlockInfo block = it.next();
|
||||||
BlockCollection bc = blocksMap.getBlockCollection(block);
|
BlockCollection bc = blocksMap.getBlockCollection(block);
|
||||||
short expectedReplication = bc.getPreferredBlockReplication();
|
short expectedReplication = bc.getPreferredBlockReplication();
|
||||||
NumberReplicas num = countNodes(block);
|
NumberReplicas num = countNodes(block);
|
||||||
@ -3452,7 +3453,7 @@ public int getTotalBlocks() {
|
|||||||
return blocksMap.size();
|
return blocksMap.size();
|
||||||
}
|
}
|
||||||
|
|
||||||
public void removeBlock(Block block) {
|
public void removeBlock(BlockInfo block) {
|
||||||
assert namesystem.hasWriteLock();
|
assert namesystem.hasWriteLock();
|
||||||
// No need to ACK blocks that are being removed entirely
|
// No need to ACK blocks that are being removed entirely
|
||||||
// from the namespace, since the removal of the associated
|
// from the namespace, since the removal of the associated
|
||||||
@ -3481,7 +3482,7 @@ public BlockInfo getStoredBlock(Block block) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/** updates a block in under replication queue */
|
/** updates a block in under replication queue */
|
||||||
private void updateNeededReplications(final Block block,
|
private void updateNeededReplications(final BlockInfo block,
|
||||||
final int curReplicasDelta, int expectedReplicasDelta) {
|
final int curReplicasDelta, int expectedReplicasDelta) {
|
||||||
namesystem.writeLock();
|
namesystem.writeLock();
|
||||||
try {
|
try {
|
||||||
@ -3513,7 +3514,7 @@ private void updateNeededReplications(final Block block,
|
|||||||
*/
|
*/
|
||||||
public void checkReplication(BlockCollection bc) {
|
public void checkReplication(BlockCollection bc) {
|
||||||
final short expected = bc.getPreferredBlockReplication();
|
final short expected = bc.getPreferredBlockReplication();
|
||||||
for (Block block : bc.getBlocks()) {
|
for (BlockInfo block : bc.getBlocks()) {
|
||||||
final NumberReplicas n = countNodes(block);
|
final NumberReplicas n = countNodes(block);
|
||||||
if (isNeededReplication(block, expected, n.liveReplicas())) {
|
if (isNeededReplication(block, expected, n.liveReplicas())) {
|
||||||
neededReplications.add(block, n.liveReplicas(),
|
neededReplications.add(block, n.liveReplicas(),
|
||||||
@ -3690,7 +3691,7 @@ public int getCapacity() {
|
|||||||
/**
|
/**
|
||||||
* Return an iterator over the set of blocks for which there are no replicas.
|
* Return an iterator over the set of blocks for which there are no replicas.
|
||||||
*/
|
*/
|
||||||
public Iterator<Block> getCorruptReplicaBlockIterator() {
|
public Iterator<BlockInfo> getCorruptReplicaBlockIterator() {
|
||||||
return neededReplications.iterator(
|
return neededReplications.iterator(
|
||||||
UnderReplicatedBlocks.QUEUE_WITH_CORRUPT_BLOCKS);
|
UnderReplicatedBlocks.QUEUE_WITH_CORRUPT_BLOCKS);
|
||||||
}
|
}
|
||||||
@ -3815,7 +3816,7 @@ public static LocatedBlock newLocatedBlock(
|
|||||||
|
|
||||||
private static class ReplicationWork {
|
private static class ReplicationWork {
|
||||||
|
|
||||||
private final Block block;
|
private final BlockInfo block;
|
||||||
private final BlockCollection bc;
|
private final BlockCollection bc;
|
||||||
|
|
||||||
private final DatanodeDescriptor srcNode;
|
private final DatanodeDescriptor srcNode;
|
||||||
@ -3826,7 +3827,7 @@ private static class ReplicationWork {
|
|||||||
private DatanodeStorageInfo targets[];
|
private DatanodeStorageInfo targets[];
|
||||||
private final int priority;
|
private final int priority;
|
||||||
|
|
||||||
public ReplicationWork(Block block,
|
public ReplicationWork(BlockInfo block,
|
||||||
BlockCollection bc,
|
BlockCollection bc,
|
||||||
DatanodeDescriptor srcNode,
|
DatanodeDescriptor srcNode,
|
||||||
List<DatanodeDescriptor> containingNodes,
|
List<DatanodeDescriptor> containingNodes,
|
||||||
|
@ -23,6 +23,7 @@
|
|||||||
import java.sql.Time;
|
import java.sql.Time;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
@ -46,8 +47,8 @@
|
|||||||
class PendingReplicationBlocks {
|
class PendingReplicationBlocks {
|
||||||
private static final Logger LOG = BlockManager.LOG;
|
private static final Logger LOG = BlockManager.LOG;
|
||||||
|
|
||||||
private final Map<Block, PendingBlockInfo> pendingReplications;
|
private final Map<BlockInfo, PendingBlockInfo> pendingReplications;
|
||||||
private final ArrayList<Block> timedOutItems;
|
private final ArrayList<BlockInfo> timedOutItems;
|
||||||
Daemon timerThread = null;
|
Daemon timerThread = null;
|
||||||
private volatile boolean fsRunning = true;
|
private volatile boolean fsRunning = true;
|
||||||
|
|
||||||
@ -62,8 +63,8 @@ class PendingReplicationBlocks {
|
|||||||
if ( timeoutPeriod > 0 ) {
|
if ( timeoutPeriod > 0 ) {
|
||||||
this.timeout = timeoutPeriod;
|
this.timeout = timeoutPeriod;
|
||||||
}
|
}
|
||||||
pendingReplications = new HashMap<Block, PendingBlockInfo>();
|
pendingReplications = new HashMap<>();
|
||||||
timedOutItems = new ArrayList<Block>();
|
timedOutItems = new ArrayList<>();
|
||||||
}
|
}
|
||||||
|
|
||||||
void start() {
|
void start() {
|
||||||
@ -76,7 +77,7 @@ void start() {
|
|||||||
* @param block The corresponding block
|
* @param block The corresponding block
|
||||||
* @param targets The DataNodes where replicas of the block should be placed
|
* @param targets The DataNodes where replicas of the block should be placed
|
||||||
*/
|
*/
|
||||||
void increment(Block block, DatanodeDescriptor[] targets) {
|
void increment(BlockInfo block, DatanodeDescriptor[] targets) {
|
||||||
synchronized (pendingReplications) {
|
synchronized (pendingReplications) {
|
||||||
PendingBlockInfo found = pendingReplications.get(block);
|
PendingBlockInfo found = pendingReplications.get(block);
|
||||||
if (found == null) {
|
if (found == null) {
|
||||||
@ -93,9 +94,9 @@ void increment(Block block, DatanodeDescriptor[] targets) {
|
|||||||
* Decrement the number of pending replication requests
|
* Decrement the number of pending replication requests
|
||||||
* for this block.
|
* for this block.
|
||||||
*
|
*
|
||||||
* @param The DataNode that finishes the replication
|
* @param dn The DataNode that finishes the replication
|
||||||
*/
|
*/
|
||||||
void decrement(Block block, DatanodeDescriptor dn) {
|
void decrement(BlockInfo block, DatanodeDescriptor dn) {
|
||||||
synchronized (pendingReplications) {
|
synchronized (pendingReplications) {
|
||||||
PendingBlockInfo found = pendingReplications.get(block);
|
PendingBlockInfo found = pendingReplications.get(block);
|
||||||
if (found != null) {
|
if (found != null) {
|
||||||
@ -115,7 +116,7 @@ void decrement(Block block, DatanodeDescriptor dn) {
|
|||||||
* @param block The given block whose pending replication requests need to be
|
* @param block The given block whose pending replication requests need to be
|
||||||
* removed
|
* removed
|
||||||
*/
|
*/
|
||||||
void remove(Block block) {
|
void remove(BlockInfo block) {
|
||||||
synchronized (pendingReplications) {
|
synchronized (pendingReplications) {
|
||||||
pendingReplications.remove(block);
|
pendingReplications.remove(block);
|
||||||
}
|
}
|
||||||
@ -138,7 +139,7 @@ int size() {
|
|||||||
/**
|
/**
|
||||||
* How many copies of this block is pending replication?
|
* How many copies of this block is pending replication?
|
||||||
*/
|
*/
|
||||||
int getNumReplicas(Block block) {
|
int getNumReplicas(BlockInfo block) {
|
||||||
synchronized (pendingReplications) {
|
synchronized (pendingReplications) {
|
||||||
PendingBlockInfo found = pendingReplications.get(block);
|
PendingBlockInfo found = pendingReplications.get(block);
|
||||||
if (found != null) {
|
if (found != null) {
|
||||||
@ -153,13 +154,13 @@ int getNumReplicas(Block block) {
|
|||||||
* replication requests. Returns null if no blocks have
|
* replication requests. Returns null if no blocks have
|
||||||
* timed out.
|
* timed out.
|
||||||
*/
|
*/
|
||||||
Block[] getTimedOutBlocks() {
|
BlockInfo[] getTimedOutBlocks() {
|
||||||
synchronized (timedOutItems) {
|
synchronized (timedOutItems) {
|
||||||
if (timedOutItems.size() <= 0) {
|
if (timedOutItems.size() <= 0) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
Block[] blockList = timedOutItems.toArray(
|
BlockInfo[] blockList = timedOutItems.toArray(
|
||||||
new Block[timedOutItems.size()]);
|
new BlockInfo[timedOutItems.size()]);
|
||||||
timedOutItems.clear();
|
timedOutItems.clear();
|
||||||
return blockList;
|
return blockList;
|
||||||
}
|
}
|
||||||
@ -179,7 +180,7 @@ static class PendingBlockInfo {
|
|||||||
PendingBlockInfo(DatanodeDescriptor[] targets) {
|
PendingBlockInfo(DatanodeDescriptor[] targets) {
|
||||||
this.timeStamp = monotonicNow();
|
this.timeStamp = monotonicNow();
|
||||||
this.targets = targets == null ? new ArrayList<DatanodeDescriptor>()
|
this.targets = targets == null ? new ArrayList<DatanodeDescriptor>()
|
||||||
: new ArrayList<DatanodeDescriptor>(Arrays.asList(targets));
|
: new ArrayList<>(Arrays.asList(targets));
|
||||||
}
|
}
|
||||||
|
|
||||||
long getTimeStamp() {
|
long getTimeStamp() {
|
||||||
@ -192,9 +193,7 @@ void setTimeStamp() {
|
|||||||
|
|
||||||
void incrementReplicas(DatanodeDescriptor... newTargets) {
|
void incrementReplicas(DatanodeDescriptor... newTargets) {
|
||||||
if (newTargets != null) {
|
if (newTargets != null) {
|
||||||
for (DatanodeDescriptor dn : newTargets) {
|
Collections.addAll(targets, newTargets);
|
||||||
targets.add(dn);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -232,17 +231,17 @@ public void run() {
|
|||||||
*/
|
*/
|
||||||
void pendingReplicationCheck() {
|
void pendingReplicationCheck() {
|
||||||
synchronized (pendingReplications) {
|
synchronized (pendingReplications) {
|
||||||
Iterator<Map.Entry<Block, PendingBlockInfo>> iter =
|
Iterator<Map.Entry<BlockInfo, PendingBlockInfo>> iter =
|
||||||
pendingReplications.entrySet().iterator();
|
pendingReplications.entrySet().iterator();
|
||||||
long now = monotonicNow();
|
long now = monotonicNow();
|
||||||
if(LOG.isDebugEnabled()) {
|
if(LOG.isDebugEnabled()) {
|
||||||
LOG.debug("PendingReplicationMonitor checking Q");
|
LOG.debug("PendingReplicationMonitor checking Q");
|
||||||
}
|
}
|
||||||
while (iter.hasNext()) {
|
while (iter.hasNext()) {
|
||||||
Map.Entry<Block, PendingBlockInfo> entry = iter.next();
|
Map.Entry<BlockInfo, PendingBlockInfo> entry = iter.next();
|
||||||
PendingBlockInfo pendingBlock = entry.getValue();
|
PendingBlockInfo pendingBlock = entry.getValue();
|
||||||
if (now > pendingBlock.getTimeStamp() + timeout) {
|
if (now > pendingBlock.getTimeStamp() + timeout) {
|
||||||
Block block = entry.getKey();
|
BlockInfo block = entry.getKey();
|
||||||
synchronized (timedOutItems) {
|
synchronized (timedOutItems) {
|
||||||
timedOutItems.add(block);
|
timedOutItems.add(block);
|
||||||
}
|
}
|
||||||
@ -275,16 +274,14 @@ void metaSave(PrintWriter out) {
|
|||||||
synchronized (pendingReplications) {
|
synchronized (pendingReplications) {
|
||||||
out.println("Metasave: Blocks being replicated: " +
|
out.println("Metasave: Blocks being replicated: " +
|
||||||
pendingReplications.size());
|
pendingReplications.size());
|
||||||
Iterator<Map.Entry<Block, PendingBlockInfo>> iter =
|
for (Map.Entry<BlockInfo, PendingBlockInfo> entry :
|
||||||
pendingReplications.entrySet().iterator();
|
pendingReplications.entrySet()) {
|
||||||
while (iter.hasNext()) {
|
|
||||||
Map.Entry<Block, PendingBlockInfo> entry = iter.next();
|
|
||||||
PendingBlockInfo pendingBlock = entry.getValue();
|
PendingBlockInfo pendingBlock = entry.getValue();
|
||||||
Block block = entry.getKey();
|
Block block = entry.getKey();
|
||||||
out.println(block +
|
out.println(block +
|
||||||
" StartTime: " + new Time(pendingBlock.timeStamp) +
|
" StartTime: " + new Time(pendingBlock.timeStamp) +
|
||||||
" NumReplicaInProgress: " +
|
" NumReplicaInProgress: " +
|
||||||
pendingBlock.getNumReplicas());
|
pendingBlock.getNumReplicas());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -20,7 +20,6 @@
|
|||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import org.apache.hadoop.hdfs.protocol.Block;
|
|
||||||
import org.apache.hadoop.hdfs.util.LightWeightLinkedSet;
|
import org.apache.hadoop.hdfs.util.LightWeightLinkedSet;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||||
|
|
||||||
@ -35,7 +34,7 @@
|
|||||||
*
|
*
|
||||||
* <p/>
|
* <p/>
|
||||||
* The policy for choosing which priority to give added blocks
|
* The policy for choosing which priority to give added blocks
|
||||||
* is implemented in {@link #getPriority(Block, int, int, int)}.
|
* is implemented in {@link #getPriority(int, int, int)}.
|
||||||
* </p>
|
* </p>
|
||||||
* <p>The queue order is as follows:</p>
|
* <p>The queue order is as follows:</p>
|
||||||
* <ol>
|
* <ol>
|
||||||
@ -62,7 +61,7 @@
|
|||||||
* blocks that are not corrupt higher priority.</li>
|
* blocks that are not corrupt higher priority.</li>
|
||||||
* </ol>
|
* </ol>
|
||||||
*/
|
*/
|
||||||
class UnderReplicatedBlocks implements Iterable<Block> {
|
class UnderReplicatedBlocks implements Iterable<BlockInfo> {
|
||||||
/** The total number of queues : {@value} */
|
/** The total number of queues : {@value} */
|
||||||
static final int LEVEL = 5;
|
static final int LEVEL = 5;
|
||||||
/** The queue with the highest priority: {@value} */
|
/** The queue with the highest priority: {@value} */
|
||||||
@ -78,8 +77,8 @@ class UnderReplicatedBlocks implements Iterable<Block> {
|
|||||||
/** The queue for corrupt blocks: {@value} */
|
/** The queue for corrupt blocks: {@value} */
|
||||||
static final int QUEUE_WITH_CORRUPT_BLOCKS = 4;
|
static final int QUEUE_WITH_CORRUPT_BLOCKS = 4;
|
||||||
/** the queues themselves */
|
/** the queues themselves */
|
||||||
private final List<LightWeightLinkedSet<Block>> priorityQueues
|
private final List<LightWeightLinkedSet<BlockInfo>> priorityQueues
|
||||||
= new ArrayList<LightWeightLinkedSet<Block>>(LEVEL);
|
= new ArrayList<>(LEVEL);
|
||||||
|
|
||||||
/** The number of corrupt blocks with replication factor 1 */
|
/** The number of corrupt blocks with replication factor 1 */
|
||||||
private int corruptReplOneBlocks = 0;
|
private int corruptReplOneBlocks = 0;
|
||||||
@ -87,7 +86,7 @@ class UnderReplicatedBlocks implements Iterable<Block> {
|
|||||||
/** Create an object. */
|
/** Create an object. */
|
||||||
UnderReplicatedBlocks() {
|
UnderReplicatedBlocks() {
|
||||||
for (int i = 0; i < LEVEL; i++) {
|
for (int i = 0; i < LEVEL; i++) {
|
||||||
priorityQueues.add(new LightWeightLinkedSet<Block>());
|
priorityQueues.add(new LightWeightLinkedSet<BlockInfo>());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -131,8 +130,8 @@ synchronized int getCorruptReplOneBlockSize() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/** Check if a block is in the neededReplication queue */
|
/** Check if a block is in the neededReplication queue */
|
||||||
synchronized boolean contains(Block block) {
|
synchronized boolean contains(BlockInfo block) {
|
||||||
for(LightWeightLinkedSet<Block> set : priorityQueues) {
|
for(LightWeightLinkedSet<BlockInfo> set : priorityQueues) {
|
||||||
if (set.contains(block)) {
|
if (set.contains(block)) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
@ -141,13 +140,11 @@ synchronized boolean contains(Block block) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/** Return the priority of a block
|
/** Return the priority of a block
|
||||||
* @param block a under replicated block
|
|
||||||
* @param curReplicas current number of replicas of the block
|
* @param curReplicas current number of replicas of the block
|
||||||
* @param expectedReplicas expected number of replicas of the block
|
* @param expectedReplicas expected number of replicas of the block
|
||||||
* @return the priority for the blocks, between 0 and ({@link #LEVEL}-1)
|
* @return the priority for the blocks, between 0 and ({@link #LEVEL}-1)
|
||||||
*/
|
*/
|
||||||
private int getPriority(Block block,
|
private int getPriority(int curReplicas,
|
||||||
int curReplicas,
|
|
||||||
int decommissionedReplicas,
|
int decommissionedReplicas,
|
||||||
int expectedReplicas) {
|
int expectedReplicas) {
|
||||||
assert curReplicas >= 0 : "Negative replicas!";
|
assert curReplicas >= 0 : "Negative replicas!";
|
||||||
@ -183,12 +180,12 @@ private int getPriority(Block block,
|
|||||||
* @param expectedReplicas expected number of replicas of the block
|
* @param expectedReplicas expected number of replicas of the block
|
||||||
* @return true if the block was added to a queue.
|
* @return true if the block was added to a queue.
|
||||||
*/
|
*/
|
||||||
synchronized boolean add(Block block,
|
synchronized boolean add(BlockInfo block,
|
||||||
int curReplicas,
|
int curReplicas,
|
||||||
int decomissionedReplicas,
|
int decomissionedReplicas,
|
||||||
int expectedReplicas) {
|
int expectedReplicas) {
|
||||||
assert curReplicas >= 0 : "Negative replicas!";
|
assert curReplicas >= 0 : "Negative replicas!";
|
||||||
int priLevel = getPriority(block, curReplicas, decomissionedReplicas,
|
int priLevel = getPriority(curReplicas, decomissionedReplicas,
|
||||||
expectedReplicas);
|
expectedReplicas);
|
||||||
if(priorityQueues.get(priLevel).add(block)) {
|
if(priorityQueues.get(priLevel).add(block)) {
|
||||||
if (priLevel == QUEUE_WITH_CORRUPT_BLOCKS &&
|
if (priLevel == QUEUE_WITH_CORRUPT_BLOCKS &&
|
||||||
@ -207,11 +204,11 @@ synchronized boolean add(Block block,
|
|||||||
}
|
}
|
||||||
|
|
||||||
/** remove a block from a under replication queue */
|
/** remove a block from a under replication queue */
|
||||||
synchronized boolean remove(Block block,
|
synchronized boolean remove(BlockInfo block,
|
||||||
int oldReplicas,
|
int oldReplicas,
|
||||||
int decommissionedReplicas,
|
int decommissionedReplicas,
|
||||||
int oldExpectedReplicas) {
|
int oldExpectedReplicas) {
|
||||||
int priLevel = getPriority(block, oldReplicas,
|
int priLevel = getPriority(oldReplicas,
|
||||||
decommissionedReplicas,
|
decommissionedReplicas,
|
||||||
oldExpectedReplicas);
|
oldExpectedReplicas);
|
||||||
boolean removedBlock = remove(block, priLevel);
|
boolean removedBlock = remove(block, priLevel);
|
||||||
@ -241,7 +238,7 @@ synchronized boolean remove(Block block,
|
|||||||
* @param priLevel expected privilege level
|
* @param priLevel expected privilege level
|
||||||
* @return true if the block was found and removed from one of the priority queues
|
* @return true if the block was found and removed from one of the priority queues
|
||||||
*/
|
*/
|
||||||
boolean remove(Block block, int priLevel) {
|
boolean remove(BlockInfo block, int priLevel) {
|
||||||
if(priLevel >= 0 && priLevel < LEVEL
|
if(priLevel >= 0 && priLevel < LEVEL
|
||||||
&& priorityQueues.get(priLevel).remove(block)) {
|
&& priorityQueues.get(priLevel).remove(block)) {
|
||||||
NameNode.blockStateChangeLog.debug(
|
NameNode.blockStateChangeLog.debug(
|
||||||
@ -279,14 +276,14 @@ boolean remove(Block block, int priLevel) {
|
|||||||
* @param curReplicasDelta the change in the replicate count from before
|
* @param curReplicasDelta the change in the replicate count from before
|
||||||
* @param expectedReplicasDelta the change in the expected replica count from before
|
* @param expectedReplicasDelta the change in the expected replica count from before
|
||||||
*/
|
*/
|
||||||
synchronized void update(Block block, int curReplicas,
|
synchronized void update(BlockInfo block, int curReplicas,
|
||||||
int decommissionedReplicas,
|
int decommissionedReplicas,
|
||||||
int curExpectedReplicas,
|
int curExpectedReplicas,
|
||||||
int curReplicasDelta, int expectedReplicasDelta) {
|
int curReplicasDelta, int expectedReplicasDelta) {
|
||||||
int oldReplicas = curReplicas-curReplicasDelta;
|
int oldReplicas = curReplicas-curReplicasDelta;
|
||||||
int oldExpectedReplicas = curExpectedReplicas-expectedReplicasDelta;
|
int oldExpectedReplicas = curExpectedReplicas-expectedReplicasDelta;
|
||||||
int curPri = getPriority(block, curReplicas, decommissionedReplicas, curExpectedReplicas);
|
int curPri = getPriority(curReplicas, decommissionedReplicas, curExpectedReplicas);
|
||||||
int oldPri = getPriority(block, oldReplicas, decommissionedReplicas, oldExpectedReplicas);
|
int oldPri = getPriority(oldReplicas, decommissionedReplicas, oldExpectedReplicas);
|
||||||
if(NameNode.stateChangeLog.isDebugEnabled()) {
|
if(NameNode.stateChangeLog.isDebugEnabled()) {
|
||||||
NameNode.stateChangeLog.debug("UnderReplicationBlocks.update " +
|
NameNode.stateChangeLog.debug("UnderReplicationBlocks.update " +
|
||||||
block +
|
block +
|
||||||
@ -336,12 +333,12 @@ synchronized void update(Block block, int curReplicas,
|
|||||||
* @return Return a list of block lists to be replicated. The block list index
|
* @return Return a list of block lists to be replicated. The block list index
|
||||||
* represents its replication priority.
|
* represents its replication priority.
|
||||||
*/
|
*/
|
||||||
public synchronized List<List<Block>> chooseUnderReplicatedBlocks(
|
public synchronized List<List<BlockInfo>> chooseUnderReplicatedBlocks(
|
||||||
int blocksToProcess) {
|
int blocksToProcess) {
|
||||||
// initialize data structure for the return value
|
// initialize data structure for the return value
|
||||||
List<List<Block>> blocksToReplicate = new ArrayList<List<Block>>(LEVEL);
|
List<List<BlockInfo>> blocksToReplicate = new ArrayList<>(LEVEL);
|
||||||
for (int i = 0; i < LEVEL; i++) {
|
for (int i = 0; i < LEVEL; i++) {
|
||||||
blocksToReplicate.add(new ArrayList<Block>());
|
blocksToReplicate.add(new ArrayList<BlockInfo>());
|
||||||
}
|
}
|
||||||
|
|
||||||
if (size() == 0) { // There are no blocks to collect.
|
if (size() == 0) { // There are no blocks to collect.
|
||||||
@ -364,7 +361,7 @@ public synchronized List<List<Block>> chooseUnderReplicatedBlocks(
|
|||||||
// Loop through all remaining blocks in the list.
|
// Loop through all remaining blocks in the list.
|
||||||
while (blockCount < blocksToProcess
|
while (blockCount < blocksToProcess
|
||||||
&& neededReplicationsIterator.hasNext()) {
|
&& neededReplicationsIterator.hasNext()) {
|
||||||
Block block = neededReplicationsIterator.next();
|
BlockInfo block = neededReplicationsIterator.next();
|
||||||
blocksToReplicate.get(priority).add(block);
|
blocksToReplicate.get(priority).add(block);
|
||||||
blockCount++;
|
blockCount++;
|
||||||
}
|
}
|
||||||
@ -396,10 +393,10 @@ public synchronized BlockIterator iterator() {
|
|||||||
/**
|
/**
|
||||||
* An iterator over blocks.
|
* An iterator over blocks.
|
||||||
*/
|
*/
|
||||||
class BlockIterator implements Iterator<Block> {
|
class BlockIterator implements Iterator<BlockInfo> {
|
||||||
private int level;
|
private int level;
|
||||||
private boolean isIteratorForLevel = false;
|
private boolean isIteratorForLevel = false;
|
||||||
private final List<Iterator<Block>> iterators = new ArrayList<Iterator<Block>>();
|
private final List<Iterator<BlockInfo>> iterators = new ArrayList<>();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Construct an iterator over all queues.
|
* Construct an iterator over all queues.
|
||||||
@ -431,7 +428,7 @@ private void update() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Block next() {
|
public BlockInfo next() {
|
||||||
if (isIteratorForLevel) {
|
if (isIteratorForLevel) {
|
||||||
return iterators.get(0).next();
|
return iterators.get(0).next();
|
||||||
}
|
}
|
||||||
|
@ -31,6 +31,8 @@
|
|||||||
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
||||||
import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
|
import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
|
||||||
import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException;
|
import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException;
|
||||||
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
|
||||||
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
|
||||||
import org.apache.hadoop.hdfs.util.EnumCounters;
|
import org.apache.hadoop.hdfs.util.EnumCounters;
|
||||||
@ -148,8 +150,8 @@ static boolean setReplication(
|
|||||||
}
|
}
|
||||||
|
|
||||||
final short[] blockRepls = new short[2]; // 0: old, 1: new
|
final short[] blockRepls = new short[2]; // 0: old, 1: new
|
||||||
final Block[] blocks = unprotectedSetReplication(fsd, src, replication,
|
final BlockInfoContiguous[] blocks = unprotectedSetReplication(fsd, src,
|
||||||
blockRepls);
|
replication, blockRepls);
|
||||||
isFile = blocks != null;
|
isFile = blocks != null;
|
||||||
if (isFile) {
|
if (isFile) {
|
||||||
fsd.getEditLog().logSetReplication(src, replication);
|
fsd.getEditLog().logSetReplication(src, replication);
|
||||||
@ -375,7 +377,7 @@ static INodeDirectory unprotectedSetQuota(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static Block[] unprotectedSetReplication(
|
static BlockInfoContiguous[] unprotectedSetReplication(
|
||||||
FSDirectory fsd, String src, short replication, short[] blockRepls)
|
FSDirectory fsd, String src, short replication, short[] blockRepls)
|
||||||
throws QuotaExceededException, UnresolvedLinkException,
|
throws QuotaExceededException, UnresolvedLinkException,
|
||||||
SnapshotAccessControlException {
|
SnapshotAccessControlException {
|
||||||
@ -410,7 +412,7 @@ static Block[] unprotectedSetReplication(
|
|||||||
blockRepls[0] = oldBR;
|
blockRepls[0] = oldBR;
|
||||||
blockRepls[1] = newBR;
|
blockRepls[1] = newBR;
|
||||||
}
|
}
|
||||||
return file.getBlocks();
|
return file.getContiguousBlocks();
|
||||||
}
|
}
|
||||||
|
|
||||||
static void unprotectedSetStoragePolicy(
|
static void unprotectedSetStoragePolicy(
|
||||||
|
@ -3211,8 +3211,8 @@ FSPermissionChecker getPermissionChecker()
|
|||||||
* of blocks that need to be removed from blocksMap
|
* of blocks that need to be removed from blocksMap
|
||||||
*/
|
*/
|
||||||
void removeBlocks(BlocksMapUpdateInfo blocks) {
|
void removeBlocks(BlocksMapUpdateInfo blocks) {
|
||||||
List<Block> toDeleteList = blocks.getToDeleteList();
|
List<BlockInfo> toDeleteList = blocks.getToDeleteList();
|
||||||
Iterator<Block> iter = toDeleteList.iterator();
|
Iterator<BlockInfo> iter = toDeleteList.iterator();
|
||||||
while (iter.hasNext()) {
|
while (iter.hasNext()) {
|
||||||
writeLock();
|
writeLock();
|
||||||
try {
|
try {
|
||||||
@ -3268,12 +3268,11 @@ void removeBlocksAndUpdateSafemodeTotal(BlocksMapUpdateInfo blocks) {
|
|||||||
boolean trackBlockCounts = isSafeModeTrackingBlocks();
|
boolean trackBlockCounts = isSafeModeTrackingBlocks();
|
||||||
int numRemovedComplete = 0, numRemovedSafe = 0;
|
int numRemovedComplete = 0, numRemovedSafe = 0;
|
||||||
|
|
||||||
for (Block b : blocks.getToDeleteList()) {
|
for (BlockInfo b : blocks.getToDeleteList()) {
|
||||||
if (trackBlockCounts) {
|
if (trackBlockCounts) {
|
||||||
BlockInfo bi = getStoredBlock(b);
|
if (b.isComplete()) {
|
||||||
if (bi.isComplete()) {
|
|
||||||
numRemovedComplete++;
|
numRemovedComplete++;
|
||||||
if (blockManager.checkMinStorage(bi, bi.numNodes())) {
|
if (blockManager.checkMinStorage(b, b.numNodes())) {
|
||||||
numRemovedSafe++;
|
numRemovedSafe++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -4185,7 +4184,8 @@ private void clearCorruptLazyPersistFiles()
|
|||||||
boolean changed = false;
|
boolean changed = false;
|
||||||
writeLock();
|
writeLock();
|
||||||
try {
|
try {
|
||||||
final Iterator<Block> it = blockManager.getCorruptReplicaBlockIterator();
|
final Iterator<BlockInfo> it =
|
||||||
|
blockManager.getCorruptReplicaBlockIterator();
|
||||||
|
|
||||||
while (it.hasNext()) {
|
while (it.hasNext()) {
|
||||||
Block b = it.next();
|
Block b = it.next();
|
||||||
@ -5126,7 +5126,7 @@ public void incrementSafeBlockCount(int replication) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void decrementSafeBlockCount(Block b) {
|
public void decrementSafeBlockCount(BlockInfo b) {
|
||||||
// safeMode is volatile, and may be set to null at any time
|
// safeMode is volatile, and may be set to null at any time
|
||||||
SafeModeInfo safeMode = this.safeMode;
|
SafeModeInfo safeMode = this.safeMode;
|
||||||
if (safeMode == null) // mostly true
|
if (safeMode == null) // mostly true
|
||||||
@ -5956,7 +5956,8 @@ Collection<CorruptFileBlockInfo> listCorruptFileBlocks(String path,
|
|||||||
}
|
}
|
||||||
// print a limited # of corrupt files per call
|
// print a limited # of corrupt files per call
|
||||||
|
|
||||||
final Iterator<Block> blkIterator = blockManager.getCorruptReplicaBlockIterator();
|
final Iterator<BlockInfo> blkIterator =
|
||||||
|
blockManager.getCorruptReplicaBlockIterator();
|
||||||
|
|
||||||
int skip = getIntCookie(cookieTab[0]);
|
int skip = getIntCookie(cookieTab[0]);
|
||||||
for (int i = 0; i < skip && blkIterator.hasNext(); i++) {
|
for (int i = 0; i < skip && blkIterator.hasNext(); i++) {
|
||||||
@ -5964,7 +5965,7 @@ Collection<CorruptFileBlockInfo> listCorruptFileBlocks(String path,
|
|||||||
}
|
}
|
||||||
|
|
||||||
while (blkIterator.hasNext()) {
|
while (blkIterator.hasNext()) {
|
||||||
Block blk = blkIterator.next();
|
BlockInfo blk = blkIterator.next();
|
||||||
final INode inode = (INode)blockManager.getBlockCollection(blk);
|
final INode inode = (INode)blockManager.getBlockCollection(blk);
|
||||||
skip++;
|
skip++;
|
||||||
if (inode != null && blockManager.countNodes(blk).liveReplicas() == 0) {
|
if (inode != null && blockManager.countNodes(blk).liveReplicas() == 0) {
|
||||||
|
@ -34,9 +34,9 @@
|
|||||||
import org.apache.hadoop.fs.permission.PermissionStatus;
|
import org.apache.hadoop.fs.permission.PermissionStatus;
|
||||||
import org.apache.hadoop.hdfs.DFSUtilClient;
|
import org.apache.hadoop.hdfs.DFSUtilClient;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||||
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
|
||||||
import org.apache.hadoop.hdfs.DFSUtil;
|
import org.apache.hadoop.hdfs.DFSUtil;
|
||||||
import org.apache.hadoop.hdfs.protocol.Block;
|
|
||||||
import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
|
import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.INodeReference.DstReference;
|
import org.apache.hadoop.hdfs.server.namenode.INodeReference.DstReference;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.INodeReference.WithName;
|
import org.apache.hadoop.hdfs.server.namenode.INodeReference.WithName;
|
||||||
@ -950,7 +950,7 @@ public static class BlocksMapUpdateInfo {
|
|||||||
/**
|
/**
|
||||||
* The list of blocks that need to be removed from blocksMap
|
* The list of blocks that need to be removed from blocksMap
|
||||||
*/
|
*/
|
||||||
private final List<Block> toDeleteList;
|
private final List<BlockInfo> toDeleteList;
|
||||||
|
|
||||||
public BlocksMapUpdateInfo() {
|
public BlocksMapUpdateInfo() {
|
||||||
toDeleteList = new ChunkedArrayList<>();
|
toDeleteList = new ChunkedArrayList<>();
|
||||||
@ -959,7 +959,7 @@ public BlocksMapUpdateInfo() {
|
|||||||
/**
|
/**
|
||||||
* @return The list of blocks that need to be removed from blocksMap
|
* @return The list of blocks that need to be removed from blocksMap
|
||||||
*/
|
*/
|
||||||
public List<Block> getToDeleteList() {
|
public List<BlockInfo> getToDeleteList() {
|
||||||
return toDeleteList;
|
return toDeleteList;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -968,12 +968,12 @@ public List<Block> getToDeleteList() {
|
|||||||
* {@link BlocksMapUpdateInfo#toDeleteList}
|
* {@link BlocksMapUpdateInfo#toDeleteList}
|
||||||
* @param toDelete the to-be-deleted block
|
* @param toDelete the to-be-deleted block
|
||||||
*/
|
*/
|
||||||
public void addDeleteBlock(Block toDelete) {
|
public void addDeleteBlock(BlockInfo toDelete) {
|
||||||
assert toDelete != null : "toDelete is null";
|
assert toDelete != null : "toDelete is null";
|
||||||
toDeleteList.add(toDelete);
|
toDeleteList.add(toDelete);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void removeDeleteBlock(Block block) {
|
public void removeDeleteBlock(BlockInfo block) {
|
||||||
assert block != null : "block is null";
|
assert block != null : "block is null";
|
||||||
toDeleteList.remove(block);
|
toDeleteList.remove(block);
|
||||||
}
|
}
|
||||||
|
@ -1086,8 +1086,8 @@ void excludeSnapshotBlocks(int snapshotId,
|
|||||||
getDiffs().findEarlierSnapshotBlocks(snapshotId);
|
getDiffs().findEarlierSnapshotBlocks(snapshotId);
|
||||||
if(snapshotBlocks == null)
|
if(snapshotBlocks == null)
|
||||||
return;
|
return;
|
||||||
List<Block> toDelete = collectedBlocks.getToDeleteList();
|
List<BlockInfo> toDelete = collectedBlocks.getToDeleteList();
|
||||||
for(Block blk : snapshotBlocks) {
|
for(BlockInfo blk : snapshotBlocks) {
|
||||||
if(toDelete.contains(blk))
|
if(toDelete.contains(blk))
|
||||||
collectedBlocks.removeDeleteBlock(blk);
|
collectedBlocks.removeDeleteBlock(blk);
|
||||||
}
|
}
|
||||||
|
@ -252,7 +252,7 @@ public void blockIdCK(String blockId) {
|
|||||||
}
|
}
|
||||||
BlockCollection bc = bm.getBlockCollection(blockInfo);
|
BlockCollection bc = bm.getBlockCollection(blockInfo);
|
||||||
INode iNode = (INode) bc;
|
INode iNode = (INode) bc;
|
||||||
NumberReplicas numberReplicas= bm.countNodes(block);
|
NumberReplicas numberReplicas= bm.countNodes(blockInfo);
|
||||||
out.println("Block Id: " + blockId);
|
out.println("Block Id: " + blockId);
|
||||||
out.println("Block belongs to: "+iNode.getFullPathName());
|
out.println("Block belongs to: "+iNode.getFullPathName());
|
||||||
out.println("No. of Expected Replica: " +
|
out.println("No. of Expected Replica: " +
|
||||||
@ -481,7 +481,7 @@ private LocatedBlocks getBlockLocations(String path, HdfsFileStatus file)
|
|||||||
throws IOException {
|
throws IOException {
|
||||||
long fileLen = file.getLen();
|
long fileLen = file.getLen();
|
||||||
LocatedBlocks blocks = null;
|
LocatedBlocks blocks = null;
|
||||||
FSNamesystem fsn = namenode.getNamesystem();
|
final FSNamesystem fsn = namenode.getNamesystem();
|
||||||
fsn.readLock();
|
fsn.readLock();
|
||||||
try {
|
try {
|
||||||
blocks = fsn.getBlockLocations(
|
blocks = fsn.getBlockLocations(
|
||||||
@ -540,8 +540,10 @@ private void collectBlocksSummary(String parent, HdfsFileStatus file, Result res
|
|||||||
ExtendedBlock block = lBlk.getBlock();
|
ExtendedBlock block = lBlk.getBlock();
|
||||||
BlockManager bm = namenode.getNamesystem().getBlockManager();
|
BlockManager bm = namenode.getNamesystem().getBlockManager();
|
||||||
|
|
||||||
|
final BlockInfo storedBlock = bm.getStoredBlock(
|
||||||
|
block.getLocalBlock());
|
||||||
// count decommissionedReplicas / decommissioningReplicas
|
// count decommissionedReplicas / decommissioningReplicas
|
||||||
NumberReplicas numberReplicas = bm.countNodes(block.getLocalBlock());
|
NumberReplicas numberReplicas = bm.countNodes(storedBlock);
|
||||||
int decommissionedReplicas = numberReplicas.decommissioned();;
|
int decommissionedReplicas = numberReplicas.decommissioned();;
|
||||||
int decommissioningReplicas = numberReplicas.decommissioning();
|
int decommissioningReplicas = numberReplicas.decommissioning();
|
||||||
res.decommissionedReplicas += decommissionedReplicas;
|
res.decommissionedReplicas += decommissionedReplicas;
|
||||||
|
@ -19,6 +19,7 @@
|
|||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.hdfs.protocol.Block;
|
import org.apache.hadoop.hdfs.protocol.Block;
|
||||||
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
|
||||||
|
|
||||||
/** SafeMode related operations. */
|
/** SafeMode related operations. */
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
@ -49,5 +50,5 @@ public interface SafeMode {
|
|||||||
public void incrementSafeBlockCount(int replication);
|
public void incrementSafeBlockCount(int replication);
|
||||||
|
|
||||||
/** Decrement number of blocks that reached minimal replication. */
|
/** Decrement number of blocks that reached minimal replication. */
|
||||||
public void decrementSafeBlockCount(Block b);
|
public void decrementSafeBlockCount(BlockInfo b);
|
||||||
}
|
}
|
||||||
|
@ -69,9 +69,10 @@ public static int[] getReplicaInfo(final FSNamesystem namesystem, final Block b)
|
|||||||
final BlockManager bm = namesystem.getBlockManager();
|
final BlockManager bm = namesystem.getBlockManager();
|
||||||
namesystem.readLock();
|
namesystem.readLock();
|
||||||
try {
|
try {
|
||||||
|
final BlockInfo storedBlock = bm.getStoredBlock(b);
|
||||||
return new int[]{getNumberOfRacks(bm, b),
|
return new int[]{getNumberOfRacks(bm, b),
|
||||||
bm.countNodes(b).liveReplicas(),
|
bm.countNodes(storedBlock).liveReplicas(),
|
||||||
bm.neededReplications.contains(b) ? 1 : 0};
|
bm.neededReplications.contains(storedBlock) ? 1 : 0};
|
||||||
} finally {
|
} finally {
|
||||||
namesystem.readUnlock();
|
namesystem.readUnlock();
|
||||||
}
|
}
|
||||||
|
@ -440,14 +440,14 @@ private BlockInfoContiguous addBlockOnNodes(long blockId, List<DatanodeDescripto
|
|||||||
return blockInfo;
|
return blockInfo;
|
||||||
}
|
}
|
||||||
|
|
||||||
private DatanodeStorageInfo[] scheduleSingleReplication(Block block) {
|
private DatanodeStorageInfo[] scheduleSingleReplication(BlockInfo block) {
|
||||||
// list for priority 1
|
// list for priority 1
|
||||||
List<Block> list_p1 = new ArrayList<Block>();
|
List<BlockInfo> list_p1 = new ArrayList<>();
|
||||||
list_p1.add(block);
|
list_p1.add(block);
|
||||||
|
|
||||||
// list of lists for each priority
|
// list of lists for each priority
|
||||||
List<List<Block>> list_all = new ArrayList<List<Block>>();
|
List<List<BlockInfo>> list_all = new ArrayList<>();
|
||||||
list_all.add(new ArrayList<Block>()); // for priority 0
|
list_all.add(new ArrayList<BlockInfo>()); // for priority 0
|
||||||
list_all.add(list_p1); // for priority 1
|
list_all.add(list_p1); // for priority 1
|
||||||
|
|
||||||
assertEquals("Block not initially pending replication", 0,
|
assertEquals("Block not initially pending replication", 0,
|
||||||
|
@ -166,10 +166,11 @@ void checkTimeout(String testLabel, long cycleTime) throws TimeoutException {
|
|||||||
|
|
||||||
/* threadsafe read of the replication counts for this block */
|
/* threadsafe read of the replication counts for this block */
|
||||||
NumberReplicas countNodes(Block block, FSNamesystem namesystem) {
|
NumberReplicas countNodes(Block block, FSNamesystem namesystem) {
|
||||||
|
BlockManager blockManager = namesystem.getBlockManager();
|
||||||
namesystem.readLock();
|
namesystem.readLock();
|
||||||
try {
|
try {
|
||||||
lastBlock = block;
|
lastBlock = block;
|
||||||
lastNum = namesystem.getBlockManager().countNodes(block);
|
lastNum = blockManager.countNodes(blockManager.getStoredBlock(block));
|
||||||
return lastNum;
|
return lastNum;
|
||||||
}
|
}
|
||||||
finally {
|
finally {
|
||||||
|
@ -117,7 +117,8 @@ public void testProcesOverReplicateBlock() throws Exception {
|
|||||||
|
|
||||||
// corrupt one won't be chosen to be excess one
|
// corrupt one won't be chosen to be excess one
|
||||||
// without 4910 the number of live replicas would be 0: block gets lost
|
// without 4910 the number of live replicas would be 0: block gets lost
|
||||||
assertEquals(1, bm.countNodes(block.getLocalBlock()).liveReplicas());
|
assertEquals(1, bm.countNodes(
|
||||||
|
bm.getStoredBlock(block.getLocalBlock())).liveReplicas());
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
namesystem.writeUnlock();
|
namesystem.writeUnlock();
|
||||||
@ -219,7 +220,7 @@ public void testInvalidateOverReplicatedBlock() throws Exception {
|
|||||||
out.close();
|
out.close();
|
||||||
ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, p);
|
ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, p);
|
||||||
assertEquals("Expected only one live replica for the block", 1, bm
|
assertEquals("Expected only one live replica for the block", 1, bm
|
||||||
.countNodes(block.getLocalBlock()).liveReplicas());
|
.countNodes(bm.getStoredBlock(block.getLocalBlock())).liveReplicas());
|
||||||
} finally {
|
} finally {
|
||||||
cluster.shutdown();
|
cluster.shutdown();
|
||||||
}
|
}
|
||||||
|
@ -53,6 +53,12 @@ public class TestPendingReplication {
|
|||||||
private static final int DFS_REPLICATION_INTERVAL = 1;
|
private static final int DFS_REPLICATION_INTERVAL = 1;
|
||||||
// Number of datanodes in the cluster
|
// Number of datanodes in the cluster
|
||||||
private static final int DATANODE_COUNT = 5;
|
private static final int DATANODE_COUNT = 5;
|
||||||
|
|
||||||
|
private BlockInfo genBlockInfo(long id, long length, long gs) {
|
||||||
|
return new BlockInfoContiguous(new Block(id, length, gs),
|
||||||
|
(short) DATANODE_COUNT);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testPendingReplication() {
|
public void testPendingReplication() {
|
||||||
PendingReplicationBlocks pendingReplications;
|
PendingReplicationBlocks pendingReplications;
|
||||||
@ -63,7 +69,7 @@ public void testPendingReplication() {
|
|||||||
//
|
//
|
||||||
DatanodeStorageInfo[] storages = DFSTestUtil.createDatanodeStorageInfos(10);
|
DatanodeStorageInfo[] storages = DFSTestUtil.createDatanodeStorageInfos(10);
|
||||||
for (int i = 0; i < storages.length; i++) {
|
for (int i = 0; i < storages.length; i++) {
|
||||||
Block block = new Block(i, i, 0);
|
BlockInfo block = genBlockInfo(i, i, 0);
|
||||||
DatanodeStorageInfo[] targets = new DatanodeStorageInfo[i];
|
DatanodeStorageInfo[] targets = new DatanodeStorageInfo[i];
|
||||||
System.arraycopy(storages, 0, targets, 0, i);
|
System.arraycopy(storages, 0, targets, 0, i);
|
||||||
pendingReplications.increment(block,
|
pendingReplications.increment(block,
|
||||||
@ -76,7 +82,7 @@ public void testPendingReplication() {
|
|||||||
//
|
//
|
||||||
// remove one item and reinsert it
|
// remove one item and reinsert it
|
||||||
//
|
//
|
||||||
Block blk = new Block(8, 8, 0);
|
BlockInfo blk = genBlockInfo(8, 8, 0);
|
||||||
pendingReplications.decrement(blk, storages[7].getDatanodeDescriptor()); // removes one replica
|
pendingReplications.decrement(blk, storages[7].getDatanodeDescriptor()); // removes one replica
|
||||||
assertEquals("pendingReplications.getNumReplicas ",
|
assertEquals("pendingReplications.getNumReplicas ",
|
||||||
7, pendingReplications.getNumReplicas(blk));
|
7, pendingReplications.getNumReplicas(blk));
|
||||||
@ -96,7 +102,7 @@ public void testPendingReplication() {
|
|||||||
// are sane.
|
// are sane.
|
||||||
//
|
//
|
||||||
for (int i = 0; i < 10; i++) {
|
for (int i = 0; i < 10; i++) {
|
||||||
Block block = new Block(i, i, 0);
|
BlockInfo block = genBlockInfo(i, i, 0);
|
||||||
int numReplicas = pendingReplications.getNumReplicas(block);
|
int numReplicas = pendingReplications.getNumReplicas(block);
|
||||||
assertTrue(numReplicas == i);
|
assertTrue(numReplicas == i);
|
||||||
}
|
}
|
||||||
@ -115,7 +121,7 @@ public void testPendingReplication() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
for (int i = 10; i < 15; i++) {
|
for (int i = 10; i < 15; i++) {
|
||||||
Block block = new Block(i, i, 0);
|
BlockInfo block = genBlockInfo(i, i, 0);
|
||||||
pendingReplications.increment(block,
|
pendingReplications.increment(block,
|
||||||
DatanodeStorageInfo.toDatanodeDescriptors(
|
DatanodeStorageInfo.toDatanodeDescriptors(
|
||||||
DFSTestUtil.createDatanodeStorageInfos(i)));
|
DFSTestUtil.createDatanodeStorageInfos(i)));
|
||||||
@ -275,7 +281,7 @@ public void testBlockReceived() throws Exception {
|
|||||||
|
|
||||||
assertEquals(1, blkManager.pendingReplications.size());
|
assertEquals(1, blkManager.pendingReplications.size());
|
||||||
INodeFile fileNode = fsn.getFSDirectory().getINode4Write(file).asFile();
|
INodeFile fileNode = fsn.getFSDirectory().getINode4Write(file).asFile();
|
||||||
Block[] blocks = fileNode.getBlocks();
|
BlockInfo[] blocks = fileNode.getBlocks();
|
||||||
assertEquals(DATANODE_COUNT - 1,
|
assertEquals(DATANODE_COUNT - 1,
|
||||||
blkManager.pendingReplications.getNumReplicas(blocks[0]));
|
blkManager.pendingReplications.getNumReplicas(blocks[0]));
|
||||||
|
|
||||||
@ -381,8 +387,8 @@ public void testPendingAndInvalidate() throws Exception {
|
|||||||
BlockManagerTestUtil.computeAllPendingWork(bm);
|
BlockManagerTestUtil.computeAllPendingWork(bm);
|
||||||
BlockManagerTestUtil.updateState(bm);
|
BlockManagerTestUtil.updateState(bm);
|
||||||
assertEquals(bm.getPendingReplicationBlocksCount(), 1L);
|
assertEquals(bm.getPendingReplicationBlocksCount(), 1L);
|
||||||
assertEquals(bm.pendingReplications.getNumReplicas(block.getBlock()
|
BlockInfo storedBlock = bm.getStoredBlock(block.getBlock().getLocalBlock());
|
||||||
.getLocalBlock()), 2);
|
assertEquals(bm.pendingReplications.getNumReplicas(storedBlock), 2);
|
||||||
|
|
||||||
// 4. delete the file
|
// 4. delete the file
|
||||||
fs.delete(filePath, true);
|
fs.delete(filePath, true);
|
||||||
|
@ -58,7 +58,9 @@ public class TestRBWBlockInvalidation {
|
|||||||
|
|
||||||
private static NumberReplicas countReplicas(final FSNamesystem namesystem,
|
private static NumberReplicas countReplicas(final FSNamesystem namesystem,
|
||||||
ExtendedBlock block) {
|
ExtendedBlock block) {
|
||||||
return namesystem.getBlockManager().countNodes(block.getLocalBlock());
|
final BlockManager blockManager = namesystem.getBlockManager();
|
||||||
|
return blockManager.countNodes(blockManager.getStoredBlock(
|
||||||
|
block.getLocalBlock()));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -832,6 +832,10 @@ public void testRereplicate3() throws Exception {
|
|||||||
assertTrue(isOnSameRack(targets[0], dataNodes[2]));
|
assertTrue(isOnSameRack(targets[0], dataNodes[2]));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private BlockInfo genBlockInfo(long id) {
|
||||||
|
return new BlockInfoContiguous(new Block(id), (short) 3);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test for the high priority blocks are processed before the low priority
|
* Test for the high priority blocks are processed before the low priority
|
||||||
* blocks.
|
* blocks.
|
||||||
@ -850,16 +854,17 @@ public void testReplicationWithPriority() throws Exception {
|
|||||||
.getNamesystem().getBlockManager().neededReplications;
|
.getNamesystem().getBlockManager().neededReplications;
|
||||||
for (int i = 0; i < 100; i++) {
|
for (int i = 0; i < 100; i++) {
|
||||||
// Adding the blocks directly to normal priority
|
// Adding the blocks directly to normal priority
|
||||||
neededReplications.add(new Block(ThreadLocalRandom.current()
|
|
||||||
.nextLong()), 2, 0, 3);
|
neededReplications.add(genBlockInfo(ThreadLocalRandom.current().
|
||||||
|
nextLong()), 2, 0, 3);
|
||||||
}
|
}
|
||||||
// Lets wait for the replication interval, to start process normal
|
// Lets wait for the replication interval, to start process normal
|
||||||
// priority blocks
|
// priority blocks
|
||||||
Thread.sleep(DFS_NAMENODE_REPLICATION_INTERVAL);
|
Thread.sleep(DFS_NAMENODE_REPLICATION_INTERVAL);
|
||||||
|
|
||||||
// Adding the block directly to high priority list
|
// Adding the block directly to high priority list
|
||||||
neededReplications.add(new Block(ThreadLocalRandom.current().nextLong()),
|
neededReplications.add(genBlockInfo(ThreadLocalRandom.current().
|
||||||
1, 0, 3);
|
nextLong()), 1, 0, 3);
|
||||||
|
|
||||||
// Lets wait for the replication interval
|
// Lets wait for the replication interval
|
||||||
Thread.sleep(DFS_NAMENODE_REPLICATION_INTERVAL);
|
Thread.sleep(DFS_NAMENODE_REPLICATION_INTERVAL);
|
||||||
@ -882,30 +887,31 @@ public void testChooseUnderReplicatedBlocks() throws Exception {
|
|||||||
|
|
||||||
for (int i = 0; i < 5; i++) {
|
for (int i = 0; i < 5; i++) {
|
||||||
// Adding QUEUE_HIGHEST_PRIORITY block
|
// Adding QUEUE_HIGHEST_PRIORITY block
|
||||||
underReplicatedBlocks.add(new Block(ThreadLocalRandom.current()
|
underReplicatedBlocks.add(genBlockInfo(ThreadLocalRandom.current().
|
||||||
.nextLong()), 1, 0, 3);
|
nextLong()), 1, 0, 3);
|
||||||
|
|
||||||
// Adding QUEUE_VERY_UNDER_REPLICATED block
|
// Adding QUEUE_VERY_UNDER_REPLICATED block
|
||||||
underReplicatedBlocks.add(new Block(ThreadLocalRandom.current()
|
underReplicatedBlocks.add(genBlockInfo(ThreadLocalRandom.current().
|
||||||
.nextLong()), 2, 0, 7);
|
nextLong()), 2, 0, 7);
|
||||||
|
|
||||||
// Adding QUEUE_REPLICAS_BADLY_DISTRIBUTED block
|
// Adding QUEUE_REPLICAS_BADLY_DISTRIBUTED block
|
||||||
underReplicatedBlocks.add(new Block(ThreadLocalRandom.current()
|
underReplicatedBlocks.add(genBlockInfo(ThreadLocalRandom.current().
|
||||||
.nextLong()), 6, 0, 6);
|
nextLong()), 6, 0, 6);
|
||||||
|
|
||||||
// Adding QUEUE_UNDER_REPLICATED block
|
// Adding QUEUE_UNDER_REPLICATED block
|
||||||
underReplicatedBlocks.add(new Block(ThreadLocalRandom.current()
|
underReplicatedBlocks.add(genBlockInfo(ThreadLocalRandom.current().
|
||||||
.nextLong()), 5, 0, 6);
|
nextLong()), 5, 0, 6);
|
||||||
|
|
||||||
// Adding QUEUE_WITH_CORRUPT_BLOCKS block
|
// Adding QUEUE_WITH_CORRUPT_BLOCKS block
|
||||||
underReplicatedBlocks.add(new Block(ThreadLocalRandom.current()
|
underReplicatedBlocks.add(genBlockInfo(ThreadLocalRandom.current().
|
||||||
.nextLong()), 0, 0, 3);
|
nextLong()), 0, 0, 3);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Choose 6 blocks from UnderReplicatedBlocks. Then it should pick 5 blocks
|
// Choose 6 blocks from UnderReplicatedBlocks. Then it should pick 5 blocks
|
||||||
// from
|
// from
|
||||||
// QUEUE_HIGHEST_PRIORITY and 1 block from QUEUE_VERY_UNDER_REPLICATED.
|
// QUEUE_HIGHEST_PRIORITY and 1 block from QUEUE_VERY_UNDER_REPLICATED.
|
||||||
List<List<Block>> chosenBlocks = underReplicatedBlocks.chooseUnderReplicatedBlocks(6);
|
List<List<BlockInfo>> chosenBlocks =
|
||||||
|
underReplicatedBlocks.chooseUnderReplicatedBlocks(6);
|
||||||
assertTheChosenBlocks(chosenBlocks, 5, 1, 0, 0, 0);
|
assertTheChosenBlocks(chosenBlocks, 5, 1, 0, 0, 0);
|
||||||
|
|
||||||
// Choose 10 blocks from UnderReplicatedBlocks. Then it should pick 4 blocks from
|
// Choose 10 blocks from UnderReplicatedBlocks. Then it should pick 4 blocks from
|
||||||
@ -915,8 +921,8 @@ public void testChooseUnderReplicatedBlocks() throws Exception {
|
|||||||
assertTheChosenBlocks(chosenBlocks, 0, 4, 5, 1, 0);
|
assertTheChosenBlocks(chosenBlocks, 0, 4, 5, 1, 0);
|
||||||
|
|
||||||
// Adding QUEUE_HIGHEST_PRIORITY
|
// Adding QUEUE_HIGHEST_PRIORITY
|
||||||
underReplicatedBlocks.add(new Block(ThreadLocalRandom.current().nextLong()),
|
underReplicatedBlocks.add(genBlockInfo(ThreadLocalRandom.current().
|
||||||
1, 0, 3);
|
nextLong()), 1, 0, 3);
|
||||||
|
|
||||||
// Choose 10 blocks from UnderReplicatedBlocks. Then it should pick 1 block from
|
// Choose 10 blocks from UnderReplicatedBlocks. Then it should pick 1 block from
|
||||||
// QUEUE_HIGHEST_PRIORITY, 4 blocks from QUEUE_REPLICAS_BADLY_DISTRIBUTED
|
// QUEUE_HIGHEST_PRIORITY, 4 blocks from QUEUE_REPLICAS_BADLY_DISTRIBUTED
|
||||||
@ -934,7 +940,7 @@ public void testChooseUnderReplicatedBlocks() throws Exception {
|
|||||||
|
|
||||||
/** asserts the chosen blocks with expected priority blocks */
|
/** asserts the chosen blocks with expected priority blocks */
|
||||||
private void assertTheChosenBlocks(
|
private void assertTheChosenBlocks(
|
||||||
List<List<Block>> chosenBlocks, int firstPrioritySize,
|
List<List<BlockInfo>> chosenBlocks, int firstPrioritySize,
|
||||||
int secondPrioritySize, int thirdPrioritySize, int fourthPrioritySize,
|
int secondPrioritySize, int thirdPrioritySize, int fourthPrioritySize,
|
||||||
int fifthPrioritySize) {
|
int fifthPrioritySize) {
|
||||||
assertEquals(
|
assertEquals(
|
||||||
@ -1108,9 +1114,9 @@ public void testGetReplWorkMultiplier() {
|
|||||||
public void testUpdateDoesNotCauseSkippedReplication() {
|
public void testUpdateDoesNotCauseSkippedReplication() {
|
||||||
UnderReplicatedBlocks underReplicatedBlocks = new UnderReplicatedBlocks();
|
UnderReplicatedBlocks underReplicatedBlocks = new UnderReplicatedBlocks();
|
||||||
|
|
||||||
Block block1 = new Block(ThreadLocalRandom.current().nextLong());
|
BlockInfo block1 = genBlockInfo(ThreadLocalRandom.current().nextLong());
|
||||||
Block block2 = new Block(ThreadLocalRandom.current().nextLong());
|
BlockInfo block2 = genBlockInfo(ThreadLocalRandom.current().nextLong());
|
||||||
Block block3 = new Block(ThreadLocalRandom.current().nextLong());
|
BlockInfo block3 = genBlockInfo(ThreadLocalRandom.current().nextLong());
|
||||||
|
|
||||||
// Adding QUEUE_VERY_UNDER_REPLICATED block
|
// Adding QUEUE_VERY_UNDER_REPLICATED block
|
||||||
final int block1CurReplicas = 2;
|
final int block1CurReplicas = 2;
|
||||||
@ -1124,7 +1130,7 @@ public void testUpdateDoesNotCauseSkippedReplication() {
|
|||||||
// Adding QUEUE_UNDER_REPLICATED block
|
// Adding QUEUE_UNDER_REPLICATED block
|
||||||
underReplicatedBlocks.add(block3, 2, 0, 6);
|
underReplicatedBlocks.add(block3, 2, 0, 6);
|
||||||
|
|
||||||
List<List<Block>> chosenBlocks;
|
List<List<BlockInfo>> chosenBlocks;
|
||||||
|
|
||||||
// Choose 1 block from UnderReplicatedBlocks. Then it should pick 1 block
|
// Choose 1 block from UnderReplicatedBlocks. Then it should pick 1 block
|
||||||
// from QUEUE_VERY_UNDER_REPLICATED.
|
// from QUEUE_VERY_UNDER_REPLICATED.
|
||||||
@ -1157,8 +1163,8 @@ public void testAddStoredBlockDoesNotCauseSkippedReplication()
|
|||||||
BlockManager bm = new BlockManager(mockNS, new HdfsConfiguration());
|
BlockManager bm = new BlockManager(mockNS, new HdfsConfiguration());
|
||||||
UnderReplicatedBlocks underReplicatedBlocks = bm.neededReplications;
|
UnderReplicatedBlocks underReplicatedBlocks = bm.neededReplications;
|
||||||
|
|
||||||
Block block1 = new Block(ThreadLocalRandom.current().nextLong());
|
BlockInfo block1 = genBlockInfo(ThreadLocalRandom.current().nextLong());
|
||||||
Block block2 = new Block(ThreadLocalRandom.current().nextLong());
|
BlockInfo block2 = genBlockInfo(ThreadLocalRandom.current().nextLong());
|
||||||
|
|
||||||
// Adding QUEUE_UNDER_REPLICATED block
|
// Adding QUEUE_UNDER_REPLICATED block
|
||||||
underReplicatedBlocks.add(block1, 0, 1, 1);
|
underReplicatedBlocks.add(block1, 0, 1, 1);
|
||||||
@ -1166,7 +1172,7 @@ public void testAddStoredBlockDoesNotCauseSkippedReplication()
|
|||||||
// Adding QUEUE_UNDER_REPLICATED block
|
// Adding QUEUE_UNDER_REPLICATED block
|
||||||
underReplicatedBlocks.add(block2, 0, 1, 1);
|
underReplicatedBlocks.add(block2, 0, 1, 1);
|
||||||
|
|
||||||
List<List<Block>> chosenBlocks;
|
List<List<BlockInfo>> chosenBlocks;
|
||||||
|
|
||||||
// Choose 1 block from UnderReplicatedBlocks. Then it should pick 1 block
|
// Choose 1 block from UnderReplicatedBlocks. Then it should pick 1 block
|
||||||
// from QUEUE_VERY_UNDER_REPLICATED.
|
// from QUEUE_VERY_UNDER_REPLICATED.
|
||||||
@ -1203,8 +1209,13 @@ public void testAddStoredBlockDoesNotCauseSkippedReplication()
|
|||||||
BlockManager bm = new BlockManager(mockNS, new HdfsConfiguration());
|
BlockManager bm = new BlockManager(mockNS, new HdfsConfiguration());
|
||||||
UnderReplicatedBlocks underReplicatedBlocks = bm.neededReplications;
|
UnderReplicatedBlocks underReplicatedBlocks = bm.neededReplications;
|
||||||
|
|
||||||
|
<<<<<<< HEAD
|
||||||
Block block1 = new Block(ThreadLocalRandom.current().nextLong());
|
Block block1 = new Block(ThreadLocalRandom.current().nextLong());
|
||||||
Block block2 = new Block(ThreadLocalRandom.current().nextLong());
|
Block block2 = new Block(ThreadLocalRandom.current().nextLong());
|
||||||
|
=======
|
||||||
|
BlockInfo block1 = genBlockInfo(random.nextLong());
|
||||||
|
BlockInfo block2 = genBlockInfo(random.nextLong());
|
||||||
|
>>>>>>> 3e6f458... HDFS-7912. Erasure Coding: track BlockInfo instead of Block in UnderReplicatedBlocks and PendingReplicationBlocks. Contributed by Jing Zhao.
|
||||||
|
|
||||||
// Adding QUEUE_UNDER_REPLICATED block
|
// Adding QUEUE_UNDER_REPLICATED block
|
||||||
underReplicatedBlocks.add(block1, 0, 1, 1);
|
underReplicatedBlocks.add(block1, 0, 1, 1);
|
||||||
@ -1212,7 +1223,7 @@ public void testAddStoredBlockDoesNotCauseSkippedReplication()
|
|||||||
// Adding QUEUE_UNDER_REPLICATED block
|
// Adding QUEUE_UNDER_REPLICATED block
|
||||||
underReplicatedBlocks.add(block2, 0, 1, 1);
|
underReplicatedBlocks.add(block2, 0, 1, 1);
|
||||||
|
|
||||||
List<List<Block>> chosenBlocks;
|
List<List<BlockInfo>> chosenBlocks;
|
||||||
|
|
||||||
// Choose 1 block from UnderReplicatedBlocks. Then it should pick 1 block
|
// Choose 1 block from UnderReplicatedBlocks. Then it should pick 1 block
|
||||||
// from QUEUE_VERY_UNDER_REPLICATED.
|
// from QUEUE_VERY_UNDER_REPLICATED.
|
||||||
@ -1266,8 +1277,13 @@ public void testupdateNeededReplicationsDoesNotCauseSkippedReplication()
|
|||||||
BlockManager bm = new BlockManager(mockNS, new HdfsConfiguration());
|
BlockManager bm = new BlockManager(mockNS, new HdfsConfiguration());
|
||||||
UnderReplicatedBlocks underReplicatedBlocks = bm.neededReplications;
|
UnderReplicatedBlocks underReplicatedBlocks = bm.neededReplications;
|
||||||
|
|
||||||
|
<<<<<<< HEAD
|
||||||
Block block1 = new Block(ThreadLocalRandom.current().nextLong());
|
Block block1 = new Block(ThreadLocalRandom.current().nextLong());
|
||||||
Block block2 = new Block(ThreadLocalRandom.current().nextLong());
|
Block block2 = new Block(ThreadLocalRandom.current().nextLong());
|
||||||
|
=======
|
||||||
|
BlockInfo block1 = genBlockInfo(random.nextLong());
|
||||||
|
BlockInfo block2 = genBlockInfo(random.nextLong());
|
||||||
|
>>>>>>> 3e6f458... HDFS-7912. Erasure Coding: track BlockInfo instead of Block in UnderReplicatedBlocks and PendingReplicationBlocks. Contributed by Jing Zhao.
|
||||||
|
|
||||||
// Adding QUEUE_UNDER_REPLICATED block
|
// Adding QUEUE_UNDER_REPLICATED block
|
||||||
underReplicatedBlocks.add(block1, 0, 1, 1);
|
underReplicatedBlocks.add(block1, 0, 1, 1);
|
||||||
@ -1275,14 +1291,14 @@ public void testupdateNeededReplicationsDoesNotCauseSkippedReplication()
|
|||||||
// Adding QUEUE_UNDER_REPLICATED block
|
// Adding QUEUE_UNDER_REPLICATED block
|
||||||
underReplicatedBlocks.add(block2, 0, 1, 1);
|
underReplicatedBlocks.add(block2, 0, 1, 1);
|
||||||
|
|
||||||
List<List<Block>> chosenBlocks;
|
List<List<BlockInfo>> chosenBlocks;
|
||||||
|
|
||||||
// Choose 1 block from UnderReplicatedBlocks. Then it should pick 1 block
|
// Choose 1 block from UnderReplicatedBlocks. Then it should pick 1 block
|
||||||
// from QUEUE_VERY_UNDER_REPLICATED.
|
// from QUEUE_VERY_UNDER_REPLICATED.
|
||||||
chosenBlocks = underReplicatedBlocks.chooseUnderReplicatedBlocks(1);
|
chosenBlocks = underReplicatedBlocks.chooseUnderReplicatedBlocks(1);
|
||||||
assertTheChosenBlocks(chosenBlocks, 1, 0, 0, 0, 0);
|
assertTheChosenBlocks(chosenBlocks, 1, 0, 0, 0, 0);
|
||||||
|
|
||||||
bm.setReplication((short)0, (short)1, "", block1);
|
bm.setReplication((short)0, (short)1, "", (BlockInfoContiguous) block1);
|
||||||
|
|
||||||
// Choose 1 block from UnderReplicatedBlocks. Then it should pick 1 block
|
// Choose 1 block from UnderReplicatedBlocks. Then it should pick 1 block
|
||||||
// from QUEUE_VERY_UNDER_REPLICATED.
|
// from QUEUE_VERY_UNDER_REPLICATED.
|
||||||
|
@ -28,6 +28,10 @@
|
|||||||
|
|
||||||
public class TestUnderReplicatedBlockQueues {
|
public class TestUnderReplicatedBlockQueues {
|
||||||
|
|
||||||
|
private BlockInfo genBlockInfo(long id) {
|
||||||
|
return new BlockInfoContiguous(new Block(id), (short) 3);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test that adding blocks with different replication counts puts them
|
* Test that adding blocks with different replication counts puts them
|
||||||
* into different queues
|
* into different queues
|
||||||
@ -36,11 +40,11 @@ public class TestUnderReplicatedBlockQueues {
|
|||||||
@Test
|
@Test
|
||||||
public void testBlockPriorities() throws Throwable {
|
public void testBlockPriorities() throws Throwable {
|
||||||
UnderReplicatedBlocks queues = new UnderReplicatedBlocks();
|
UnderReplicatedBlocks queues = new UnderReplicatedBlocks();
|
||||||
Block block1 = new Block(1);
|
BlockInfo block1 = genBlockInfo(1);
|
||||||
Block block2 = new Block(2);
|
BlockInfo block2 = genBlockInfo(2);
|
||||||
Block block_very_under_replicated = new Block(3);
|
BlockInfo block_very_under_replicated = genBlockInfo(3);
|
||||||
Block block_corrupt = new Block(4);
|
BlockInfo block_corrupt = genBlockInfo(4);
|
||||||
Block block_corrupt_repl_one = new Block(5);
|
BlockInfo block_corrupt_repl_one = genBlockInfo(5);
|
||||||
|
|
||||||
//add a block with a single entry
|
//add a block with a single entry
|
||||||
assertAdded(queues, block1, 1, 0, 3);
|
assertAdded(queues, block1, 1, 0, 3);
|
||||||
@ -82,7 +86,7 @@ public void testBlockPriorities() throws Throwable {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private void assertAdded(UnderReplicatedBlocks queues,
|
private void assertAdded(UnderReplicatedBlocks queues,
|
||||||
Block block,
|
BlockInfo block,
|
||||||
int curReplicas,
|
int curReplicas,
|
||||||
int decomissionedReplicas,
|
int decomissionedReplicas,
|
||||||
int expectedReplicas) {
|
int expectedReplicas) {
|
||||||
|
@ -40,6 +40,7 @@
|
|||||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||||
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
||||||
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
|
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
|
||||||
@ -81,6 +82,7 @@ public class TestReadOnlySharedStorage {
|
|||||||
private DatanodeInfo readOnlyDataNode;
|
private DatanodeInfo readOnlyDataNode;
|
||||||
|
|
||||||
private Block block;
|
private Block block;
|
||||||
|
private BlockInfo storedBlock;
|
||||||
|
|
||||||
private ExtendedBlock extendedBlock;
|
private ExtendedBlock extendedBlock;
|
||||||
|
|
||||||
@ -132,6 +134,7 @@ public void setup() throws IOException, InterruptedException {
|
|||||||
LocatedBlock locatedBlock = getLocatedBlock();
|
LocatedBlock locatedBlock = getLocatedBlock();
|
||||||
extendedBlock = locatedBlock.getBlock();
|
extendedBlock = locatedBlock.getBlock();
|
||||||
block = extendedBlock.getLocalBlock();
|
block = extendedBlock.getLocalBlock();
|
||||||
|
storedBlock = blockManager.getStoredBlock(block);
|
||||||
|
|
||||||
assertThat(locatedBlock.getLocations().length, is(1));
|
assertThat(locatedBlock.getLocations().length, is(1));
|
||||||
normalDataNode = locatedBlock.getLocations()[0];
|
normalDataNode = locatedBlock.getLocations()[0];
|
||||||
@ -188,7 +191,7 @@ private void validateStorageState(StorageReport[] storageReports, DatanodeStorag
|
|||||||
}
|
}
|
||||||
|
|
||||||
private void validateNumberReplicas(int expectedReplicas) throws IOException {
|
private void validateNumberReplicas(int expectedReplicas) throws IOException {
|
||||||
NumberReplicas numberReplicas = blockManager.countNodes(block);
|
NumberReplicas numberReplicas = blockManager.countNodes(storedBlock);
|
||||||
assertThat(numberReplicas.liveReplicas(), is(expectedReplicas));
|
assertThat(numberReplicas.liveReplicas(), is(expectedReplicas));
|
||||||
assertThat(numberReplicas.excessReplicas(), is(0));
|
assertThat(numberReplicas.excessReplicas(), is(0));
|
||||||
assertThat(numberReplicas.corruptReplicas(), is(0));
|
assertThat(numberReplicas.corruptReplicas(), is(0));
|
||||||
@ -230,7 +233,7 @@ public void testNormalReplicaOffline() throws Exception {
|
|||||||
cluster.getNameNode(), normalDataNode.getXferAddr());
|
cluster.getNameNode(), normalDataNode.getXferAddr());
|
||||||
|
|
||||||
// The live replica count should now be zero (since the NORMAL replica is offline)
|
// The live replica count should now be zero (since the NORMAL replica is offline)
|
||||||
NumberReplicas numberReplicas = blockManager.countNodes(block);
|
NumberReplicas numberReplicas = blockManager.countNodes(storedBlock);
|
||||||
assertThat(numberReplicas.liveReplicas(), is(0));
|
assertThat(numberReplicas.liveReplicas(), is(0));
|
||||||
|
|
||||||
// The block should be reported as under-replicated
|
// The block should be reported as under-replicated
|
||||||
@ -263,7 +266,7 @@ public void testReadOnlyReplicaCorrupt() throws Exception {
|
|||||||
waitForLocations(1);
|
waitForLocations(1);
|
||||||
|
|
||||||
// However, the corrupt READ_ONLY_SHARED replica should *not* affect the overall corrupt replicas count
|
// However, the corrupt READ_ONLY_SHARED replica should *not* affect the overall corrupt replicas count
|
||||||
NumberReplicas numberReplicas = blockManager.countNodes(block);
|
NumberReplicas numberReplicas = blockManager.countNodes(storedBlock);
|
||||||
assertThat(numberReplicas.corruptReplicas(), is(0));
|
assertThat(numberReplicas.corruptReplicas(), is(0));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -32,6 +32,7 @@
|
|||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
|
import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
|
||||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||||
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.NumberReplicas;
|
import org.apache.hadoop.hdfs.server.blockmanagement.NumberReplicas;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
|
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
@ -260,7 +261,9 @@ public void testWithAllCorruptReplicas() throws Exception {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private static NumberReplicas countReplicas(final FSNamesystem namesystem, ExtendedBlock block) {
|
private static NumberReplicas countReplicas(final FSNamesystem namesystem, ExtendedBlock block) {
|
||||||
return namesystem.getBlockManager().countNodes(block.getLocalBlock());
|
final BlockManager blockManager = namesystem.getBlockManager();
|
||||||
|
return blockManager.countNodes(blockManager.getStoredBlock(
|
||||||
|
block.getLocalBlock()));
|
||||||
}
|
}
|
||||||
|
|
||||||
private void corruptBlock(MiniDFSCluster cluster, FileSystem fs, final Path fileName,
|
private void corruptBlock(MiniDFSCluster cluster, FileSystem fs, final Path fileName,
|
||||||
|
Loading…
x
Reference in New Issue
Block a user