HDFS-5222. Move block schedule information from DatanodeDescriptor to DatanodeStorageInfo.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-2832@1526215 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Tsz-wo Sze 2013-09-25 16:03:20 +00:00
parent 114aa229de
commit c6d09d8c0c
6 changed files with 72 additions and 66 deletions

View File

@ -30,3 +30,6 @@ IMPROVEMENTS:
HDFS-5232. Protocol changes to transmit StorageUuid. (Arpit Agarwal) HDFS-5232. Protocol changes to transmit StorageUuid. (Arpit Agarwal)
HDFS-5233. Use Datanode UUID to identify Datanodes. (Arpit Agarwal) HDFS-5233. Use Datanode UUID to identify Datanodes. (Arpit Agarwal)
HDFS-5222. Move block schedule information from DatanodeDescriptor to
DatanodeStorageInfo. (szetszwo)

View File

@ -1328,10 +1328,7 @@ public class BlockManager {
// Add block to the to be replicated list // Add block to the to be replicated list
rw.srcNode.addBlockToBeReplicated(block, targets); rw.srcNode.addBlockToBeReplicated(block, targets);
scheduledWork++; scheduledWork++;
DatanodeStorageInfo.incrementBlocksScheduled(targets);
for (DatanodeStorageInfo storage : targets) {
storage.getDatanodeDescriptor().incBlocksScheduled();
}
// Move the block-replication into a "pending" state. // Move the block-replication into a "pending" state.
// The reason we use 'pending' is so we can retry // The reason we use 'pending' is so we can retry
@ -2621,10 +2618,10 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
@VisibleForTesting @VisibleForTesting
void addBlock(DatanodeDescriptor node, String storageID, Block block, String delHint) void addBlock(DatanodeDescriptor node, String storageID, Block block, String delHint)
throws IOException { throws IOException {
// decrement number of blocks scheduled to this datanode. // Decrement number of blocks scheduled to this storage.
// for a retry request (of DatanodeProtocol#blockReceivedAndDeleted with // for a retry request (of DatanodeProtocol#blockReceivedAndDeleted with
// RECEIVED_BLOCK), we currently also decrease the approximate number. // RECEIVED_BLOCK), we currently also decrease the approximate number.
node.decBlocksScheduled(); node.getStorageInfo(storageID).decrementBlocksScheduled();
// get the deletion hint node // get the deletion hint node
DatanodeDescriptor delHintNode = null; DatanodeDescriptor delHintNode = null;

View File

@ -620,18 +620,11 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
} }
final long requiredSize = blockSize * HdfsConstants.MIN_BLOCKS_FOR_WRITE; final long requiredSize = blockSize * HdfsConstants.MIN_BLOCKS_FOR_WRITE;
if (requiredSize > storage.getRemaining()) { final long scheduledSize = blockSize = storage.getBlocksScheduled();
if (requiredSize > storage.getRemaining() - scheduledSize) {
logNodeIsNotChosen(storage, "the storage does not have enough space "); logNodeIsNotChosen(storage, "the storage does not have enough space ");
return false; return false;
} }
//TODO: move getBlocksScheduled() to DatanodeStorageInfo.
long remaining = node.getRemaining() -
(node.getBlocksScheduled() * blockSize);
// check the remaining capacity of the target machine
if (requiredSize > remaining) {
logNodeIsNotChosen(storage, "the node does not have enough space ");
return false;
}
// check the communication traffic of the target machine // check the communication traffic of the target machine
if (considerLoad) { if (considerLoad) {

View File

@ -125,15 +125,6 @@ public class DatanodeDescriptor extends DatanodeInfo {
/** A set of blocks to be invalidated by this datanode */ /** A set of blocks to be invalidated by this datanode */
private LightWeightHashSet<Block> invalidateBlocks = new LightWeightHashSet<Block>(); private LightWeightHashSet<Block> invalidateBlocks = new LightWeightHashSet<Block>();
/* Variables for maintaining number of blocks scheduled to be written to
* this datanode. This count is approximate and might be slightly bigger
* in case of errors (e.g. datanode does not report if an error occurs
* while writing the block).
*/
private int currApproxBlocksScheduled = 0;
private int prevApproxBlocksScheduled = 0;
private long lastBlocksScheduledRollTime = 0;
private static final int BLOCKS_SCHEDULED_ROLL_INTERVAL = 600*1000; //10min
private int volumeFailures = 0; private int volumeFailures = 0;
/** /**
@ -313,9 +304,8 @@ public class DatanodeDescriptor extends DatanodeInfo {
setLastUpdate(Time.now()); setLastUpdate(Time.now());
this.volumeFailures = volFailures; this.volumeFailures = volFailures;
for(DatanodeStorageInfo storage : getStorageInfos()) { for(DatanodeStorageInfo storage : getStorageInfos()) {
storage.receivedHeartbeat(); storage.receivedHeartbeat(getLastUpdate());
} }
rollBlocksScheduled(getLastUpdate());
} }
private static class BlockIterator implements Iterator<BlockInfo> { private static class BlockIterator implements Iterator<BlockInfo> {
@ -437,38 +427,11 @@ public class DatanodeDescriptor extends DatanodeInfo {
* to this datanode. * to this datanode.
*/ */
public int getBlocksScheduled() { public int getBlocksScheduled() {
return currApproxBlocksScheduled + prevApproxBlocksScheduled; int n = 0;
} for(DatanodeStorageInfo storage : getStorageInfos()) {
n += storage.getBlocksScheduled();
/**
* Increments counter for number of blocks scheduled.
*/
public void incBlocksScheduled() {
currApproxBlocksScheduled++;
}
/**
* Decrements counter for number of blocks scheduled.
*/
void decBlocksScheduled() {
if (prevApproxBlocksScheduled > 0) {
prevApproxBlocksScheduled--;
} else if (currApproxBlocksScheduled > 0) {
currApproxBlocksScheduled--;
}
// its ok if both counters are zero.
}
/**
* Adjusts curr and prev number of blocks scheduled every few minutes.
*/
private void rollBlocksScheduled(long now) {
if ((now - lastBlocksScheduledRollTime) >
BLOCKS_SCHEDULED_ROLL_INTERVAL) {
prevApproxBlocksScheduled = currApproxBlocksScheduled;
currApproxBlocksScheduled = 0;
lastBlocksScheduledRollTime = now;
} }
return n;
} }
@Override @Override

View File

@ -92,9 +92,11 @@ public class DatanodeStorageInfo {
private final String storageID; private final String storageID;
private final StorageType storageType; private final StorageType storageType;
private State state; private State state;
private long capacity; private long capacity;
private long dfsUsed; private long dfsUsed;
private long remaining; private long remaining;
private volatile BlockInfo blockList = null; private volatile BlockInfo blockList = null;
private int numBlocks = 0; private int numBlocks = 0;
@ -117,6 +119,16 @@ public class DatanodeStorageInfo {
*/ */
private boolean blockContentsStale = true; private boolean blockContentsStale = true;
/* Variables for maintaining number of blocks scheduled to be written to
* this storage. This count is approximate and might be slightly bigger
* in case of errors (e.g. datanode does not report if an error occurs
* while writing the block).
*/
private int currApproxBlocksScheduled = 0;
private int prevApproxBlocksScheduled = 0;
private long lastBlocksScheduledRollTime = 0;
private static final int BLOCKS_SCHEDULED_ROLL_INTERVAL = 600*1000; //10min
public DatanodeStorageInfo(DatanodeDescriptor dn, DatanodeStorage s) { public DatanodeStorageInfo(DatanodeDescriptor dn, DatanodeStorage s) {
this.dn = dn; this.dn = dn;
this.storageID = s.getStorageID(); this.storageID = s.getStorageID();
@ -132,27 +144,28 @@ public class DatanodeStorageInfo {
this.blockReportCount = blockReportCount; this.blockReportCount = blockReportCount;
} }
public boolean areBlockContentsStale() { boolean areBlockContentsStale() {
return blockContentsStale; return blockContentsStale;
} }
public void markStaleAfterFailover() { void markStaleAfterFailover() {
heartbeatedSinceFailover = false; heartbeatedSinceFailover = false;
blockContentsStale = true; blockContentsStale = true;
} }
public void receivedHeartbeat() { void receivedHeartbeat(final long lastUpdate) {
heartbeatedSinceFailover = true; heartbeatedSinceFailover = true;
rollBlocksScheduled(lastUpdate);
} }
public void receivedBlockReport() { void receivedBlockReport() {
if (heartbeatedSinceFailover) { if (heartbeatedSinceFailover) {
blockContentsStale = false; blockContentsStale = false;
} }
blockReportCount++; blockReportCount++;
} }
public void setUtilization(long capacity, long dfsUsed, long remaining) { void setUtilization(long capacity, long dfsUsed, long remaining) {
this.capacity = capacity; this.capacity = capacity;
this.dfsUsed = dfsUsed; this.dfsUsed = dfsUsed;
this.remaining = remaining; this.remaining = remaining;
@ -225,6 +238,45 @@ public class DatanodeStorageInfo {
return dn; return dn;
} }
/**
* @return Approximate number of blocks currently scheduled to be written
* to this storage.
*/
int getBlocksScheduled() {
return currApproxBlocksScheduled + prevApproxBlocksScheduled;
}
/** Increment the number of blocks scheduled for each given storage */
public static void incrementBlocksScheduled(DatanodeStorageInfo... storages) {
for (DatanodeStorageInfo s : storages) {
s.incrementBlocksScheduled();
}
}
/** Increment the number of blocks scheduled. */
private void incrementBlocksScheduled() {
currApproxBlocksScheduled++;
}
/** Decrement the number of blocks scheduled. */
void decrementBlocksScheduled() {
if (prevApproxBlocksScheduled > 0) {
prevApproxBlocksScheduled--;
} else if (currApproxBlocksScheduled > 0) {
currApproxBlocksScheduled--;
}
// its ok if both counters are zero.
}
/** Adjusts curr and prev number of blocks scheduled every few minutes. */
private void rollBlocksScheduled(long now) {
if (now - lastBlocksScheduledRollTime > BLOCKS_SCHEDULED_ROLL_INTERVAL) {
prevApproxBlocksScheduled = currApproxBlocksScheduled;
currApproxBlocksScheduled = 0;
lastBlocksScheduledRollTime = now;
}
}
@Override @Override
public boolean equals(Object obj) { public boolean equals(Object obj) {
if (this == obj) { if (this == obj) {

View File

@ -2868,9 +2868,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
BlockInfo b = dir.addBlock(src, inodesInPath, newBlock, targets); BlockInfo b = dir.addBlock(src, inodesInPath, newBlock, targets);
NameNode.stateChangeLog.info("BLOCK* allocateBlock: " + src + ". " NameNode.stateChangeLog.info("BLOCK* allocateBlock: " + src + ". "
+ getBlockPoolId() + " " + b); + getBlockPoolId() + " " + b);
for (DatanodeStorageInfo storage : targets) { DatanodeStorageInfo.incrementBlocksScheduled(targets);
storage.getDatanodeDescriptor().incBlocksScheduled();
}
return b; return b;
} }