HDFS-5452. Fix TestReplicationPolicy and TestBlocksScheduledCounter.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-2832@1538407 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
cbdeba29bd
commit
26a1fda51e
|
@ -60,3 +60,5 @@ IMPROVEMENTS:
|
|||
|
||||
HDFS-5447. Fix TestJspHelper. (Arpit Agarwal)
|
||||
|
||||
HDFS-5452. Fix TestReplicationPolicy and TestBlocksScheduledCounter.
|
||||
(szetszwo)
|
||||
|
|
|
@ -94,8 +94,7 @@ public class BlockInfo extends Block implements LightWeightGSet.LinkedElement {
|
|||
DatanodeStorageInfo getStorageInfo(int index) {
|
||||
assert this.triplets != null : "BlockInfo is not initialized";
|
||||
assert index >= 0 && index*3 < triplets.length : "Index is out of bound";
|
||||
DatanodeStorageInfo storage = (DatanodeStorageInfo)triplets[index*3];
|
||||
return storage;
|
||||
return (DatanodeStorageInfo)triplets[index*3];
|
||||
}
|
||||
|
||||
private BlockInfo getPrevious(int index) {
|
||||
|
@ -118,7 +117,7 @@ public class BlockInfo extends Block implements LightWeightGSet.LinkedElement {
|
|||
return info;
|
||||
}
|
||||
|
||||
void setStorageInfo(int index, DatanodeStorageInfo storage) {
|
||||
private void setStorageInfo(int index, DatanodeStorageInfo storage) {
|
||||
assert this.triplets != null : "BlockInfo is not initialized";
|
||||
assert index >= 0 && index*3 < triplets.length : "Index is out of bound";
|
||||
triplets[index*3] = storage;
|
||||
|
|
|
@ -2628,17 +2628,10 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
|
|||
@VisibleForTesting
|
||||
void addBlock(DatanodeDescriptor node, String storageID, Block block, String delHint)
|
||||
throws IOException {
|
||||
// Decrement number of blocks scheduled to this storage.
|
||||
// Decrement number of blocks scheduled to this datanode.
|
||||
// for a retry request (of DatanodeProtocol#blockReceivedAndDeleted with
|
||||
// RECEIVED_BLOCK), we currently also decrease the approximate number.
|
||||
DatanodeStorageInfo storageInfo = node.getStorageInfo(storageID);
|
||||
if (storageInfo != null) {
|
||||
storageInfo.decrementBlocksScheduled();
|
||||
} else {
|
||||
throw new IllegalArgumentException(
|
||||
"Unrecognized storageID " + storageID + " in block report " +
|
||||
"from Datanode " + node.toString());
|
||||
}
|
||||
node.decrementBlocksScheduled();
|
||||
|
||||
// get the deletion hint node
|
||||
DatanodeDescriptor delHintNode = null;
|
||||
|
|
|
@ -620,9 +620,9 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
|||
}
|
||||
|
||||
final long requiredSize = blockSize * HdfsConstants.MIN_BLOCKS_FOR_WRITE;
|
||||
final long scheduledSize = blockSize = storage.getBlocksScheduled();
|
||||
if (requiredSize > storage.getRemaining() - scheduledSize) {
|
||||
logNodeIsNotChosen(storage, "the storage does not have enough space ");
|
||||
final long scheduledSize = blockSize * node.getBlocksScheduled();
|
||||
if (requiredSize > node.getRemaining() - scheduledSize) {
|
||||
logNodeIsNotChosen(storage, "the node does not have enough space ");
|
||||
return false;
|
||||
}
|
||||
|
||||
|
|
|
@ -27,22 +27,22 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
import java.util.Queue;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
import org.apache.hadoop.hdfs.server.namenode.CachedBlock;
|
||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
|
||||
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
|
||||
import org.apache.hadoop.hdfs.server.namenode.CachedBlock;
|
||||
import org.apache.hadoop.hdfs.util.LightWeightHashSet;
|
||||
import org.apache.hadoop.util.IntrusiveCollection;
|
||||
import org.apache.hadoop.util.Time;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
/**
|
||||
* This class extends the DatanodeInfo class with ephemeral information (eg
|
||||
* health, capacity, what blocks are associated with the Datanode) that is
|
||||
|
@ -192,6 +192,15 @@ public class DatanodeDescriptor extends DatanodeInfo {
|
|||
/** A set of blocks to be invalidated by this datanode */
|
||||
private LightWeightHashSet<Block> invalidateBlocks = new LightWeightHashSet<Block>();
|
||||
|
||||
/* Variables for maintaining number of blocks scheduled to be written to
|
||||
* this storage. This count is approximate and might be slightly bigger
|
||||
* in case of errors (e.g. datanode does not report if an error occurs
|
||||
* while writing the block).
|
||||
*/
|
||||
private int currApproxBlocksScheduled = 0;
|
||||
private int prevApproxBlocksScheduled = 0;
|
||||
private long lastBlocksScheduledRollTime = 0;
|
||||
private static final int BLOCKS_SCHEDULED_ROLL_INTERVAL = 600*1000; //10min
|
||||
private int volumeFailures = 0;
|
||||
|
||||
/**
|
||||
|
@ -342,7 +351,7 @@ public class DatanodeDescriptor extends DatanodeInfo {
|
|||
for (StorageReport report : reports) {
|
||||
DatanodeStorageInfo storage = storageMap.get(report.getStorageID());
|
||||
if (storage != null) {
|
||||
storage.receivedHeartbeat(report, getLastUpdate());
|
||||
storage.receivedHeartbeat(report);
|
||||
totalCapacity += report.getCapacity();
|
||||
totalRemaining += report.getRemaining();
|
||||
totalBlockPoolUsed += report.getBlockPoolUsed();
|
||||
|
@ -354,6 +363,7 @@ public class DatanodeDescriptor extends DatanodeInfo {
|
|||
LOG.warn("Unrecognized storage ID " + report.getStorageID());
|
||||
}
|
||||
}
|
||||
rollBlocksScheduled(getLastUpdate());
|
||||
|
||||
// Update total metrics for the node.
|
||||
setCapacity(totalCapacity);
|
||||
|
@ -481,11 +491,31 @@ public class DatanodeDescriptor extends DatanodeInfo {
|
|||
* to this datanode.
|
||||
*/
|
||||
public int getBlocksScheduled() {
|
||||
int n = 0;
|
||||
for(DatanodeStorageInfo storage : getStorageInfos()) {
|
||||
n += storage.getBlocksScheduled();
|
||||
return currApproxBlocksScheduled + prevApproxBlocksScheduled;
|
||||
}
|
||||
|
||||
/** Increment the number of blocks scheduled. */
|
||||
void incrementBlocksScheduled() {
|
||||
currApproxBlocksScheduled++;
|
||||
}
|
||||
|
||||
/** Decrement the number of blocks scheduled. */
|
||||
void decrementBlocksScheduled() {
|
||||
if (prevApproxBlocksScheduled > 0) {
|
||||
prevApproxBlocksScheduled--;
|
||||
} else if (currApproxBlocksScheduled > 0) {
|
||||
currApproxBlocksScheduled--;
|
||||
}
|
||||
// its ok if both counters are zero.
|
||||
}
|
||||
|
||||
/** Adjusts curr and prev number of blocks scheduled every few minutes. */
|
||||
private void rollBlocksScheduled(long now) {
|
||||
if (now - lastBlocksScheduledRollTime > BLOCKS_SCHEDULED_ROLL_INTERVAL) {
|
||||
prevApproxBlocksScheduled = currApproxBlocksScheduled;
|
||||
currApproxBlocksScheduled = 0;
|
||||
lastBlocksScheduledRollTime = now;
|
||||
}
|
||||
return n;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -121,16 +121,6 @@ public class DatanodeStorageInfo {
|
|||
*/
|
||||
private boolean blockContentsStale = true;
|
||||
|
||||
/* Variables for maintaining number of blocks scheduled to be written to
|
||||
* this storage. This count is approximate and might be slightly bigger
|
||||
* in case of errors (e.g. datanode does not report if an error occurs
|
||||
* while writing the block).
|
||||
*/
|
||||
private int currApproxBlocksScheduled = 0;
|
||||
private int prevApproxBlocksScheduled = 0;
|
||||
private long lastBlocksScheduledRollTime = 0;
|
||||
private static final int BLOCKS_SCHEDULED_ROLL_INTERVAL = 600*1000; //10min
|
||||
|
||||
public DatanodeStorageInfo(DatanodeDescriptor dn, DatanodeStorage s) {
|
||||
this.dn = dn;
|
||||
this.storageID = s.getStorageID();
|
||||
|
@ -155,10 +145,9 @@ public class DatanodeStorageInfo {
|
|||
blockContentsStale = true;
|
||||
}
|
||||
|
||||
void receivedHeartbeat(StorageReport report, final long lastUpdate) {
|
||||
void receivedHeartbeat(StorageReport report) {
|
||||
updateState(report);
|
||||
heartbeatedSinceFailover = true;
|
||||
rollBlocksScheduled(lastUpdate);
|
||||
}
|
||||
|
||||
void receivedBlockReport() {
|
||||
|
@ -249,42 +238,10 @@ public class DatanodeStorageInfo {
|
|||
return dn;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Approximate number of blocks currently scheduled to be written
|
||||
* to this storage.
|
||||
*/
|
||||
int getBlocksScheduled() {
|
||||
return currApproxBlocksScheduled + prevApproxBlocksScheduled;
|
||||
}
|
||||
|
||||
/** Increment the number of blocks scheduled for each given storage */
|
||||
public static void incrementBlocksScheduled(DatanodeStorageInfo... storages) {
|
||||
for (DatanodeStorageInfo s : storages) {
|
||||
s.incrementBlocksScheduled();
|
||||
}
|
||||
}
|
||||
|
||||
/** Increment the number of blocks scheduled. */
|
||||
private void incrementBlocksScheduled() {
|
||||
currApproxBlocksScheduled++;
|
||||
}
|
||||
|
||||
/** Decrement the number of blocks scheduled. */
|
||||
void decrementBlocksScheduled() {
|
||||
if (prevApproxBlocksScheduled > 0) {
|
||||
prevApproxBlocksScheduled--;
|
||||
} else if (currApproxBlocksScheduled > 0) {
|
||||
currApproxBlocksScheduled--;
|
||||
}
|
||||
// its ok if both counters are zero.
|
||||
}
|
||||
|
||||
/** Adjusts curr and prev number of blocks scheduled every few minutes. */
|
||||
private void rollBlocksScheduled(long now) {
|
||||
if (now - lastBlocksScheduledRollTime > BLOCKS_SCHEDULED_ROLL_INTERVAL) {
|
||||
prevApproxBlocksScheduled = currApproxBlocksScheduled;
|
||||
currApproxBlocksScheduled = 0;
|
||||
lastBlocksScheduledRollTime = now;
|
||||
s.getDatanodeDescriptor().incrementBlocksScheduled();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -474,7 +474,7 @@ public class TestReplicationPolicy {
|
|||
assertFalse(log.size() == 0);
|
||||
final LoggingEvent lastLogEntry = log.get(log.size() - 1);
|
||||
|
||||
assertEquals(lastLogEntry.getLevel(), Level.WARN);
|
||||
assertTrue(Level.WARN.isGreaterOrEqual(lastLogEntry.getLevel()));
|
||||
// Suppose to place replicas on each node but two data nodes are not
|
||||
// available for placing replica, so here we expect a short of 2
|
||||
assertTrue(((String)lastLogEntry.getMessage()).contains("in need of 2"));
|
||||
|
|
Loading…
Reference in New Issue