HDFS-4350. Merging change 1441819 from trunk
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1441821 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
6db7aa6f89
commit
0267141ec6
|
@ -21,6 +21,9 @@ Release 2.0.3-alpha - Unreleased
|
|||
HDFS-4451. hdfs balancer command returns exit code 1 on success instead
|
||||
of 0. (Joshua Blatt via suresh)
|
||||
|
||||
HDFS-4350. Make enabling of stale marking on read and write paths
|
||||
independent. (Andrew Wang via suresh)
|
||||
|
||||
|
||||
NEW FEATURES
|
||||
|
||||
|
|
|
@ -181,10 +181,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
|||
public static final String DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_KEY = "dfs.datanode.socket.reuse.keepalive";
|
||||
public static final int DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_DEFAULT = 1000;
|
||||
|
||||
// Whether to enable datanode's stale state detection and usage
|
||||
public static final String DFS_NAMENODE_CHECK_STALE_DATANODE_KEY = "dfs.namenode.check.stale.datanode";
|
||||
public static final boolean DFS_NAMENODE_CHECK_STALE_DATANODE_DEFAULT = false;
|
||||
// Whether to enable datanode's stale state detection and usage
|
||||
// Whether to enable datanode's stale state detection and usage for reads
|
||||
public static final String DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_KEY = "dfs.namenode.avoid.read.stale.datanode";
|
||||
public static final boolean DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_DEFAULT = false;
|
||||
// Whether to enable datanode's stale state detection and usage for writes
|
||||
public static final String DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_KEY = "dfs.namenode.avoid.write.stale.datanode";
|
||||
public static final boolean DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_DEFAULT = false;
|
||||
// The default value of the time interval for marking datanodes as stale
|
||||
|
@ -195,8 +195,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
|||
public static final String DFS_NAMENODE_STALE_DATANODE_MINIMUM_INTERVAL_KEY = "dfs.namenode.stale.datanode.minimum.interval";
|
||||
public static final int DFS_NAMENODE_STALE_DATANODE_MINIMUM_INTERVAL_DEFAULT = 3; // i.e. min_interval is 3 * heartbeat_interval = 9s
|
||||
|
||||
// When the number stale datanodes marked as stale reached this certian ratio,
|
||||
// stop avoiding writing to stale nodes so as to prevent causing hotspots.
|
||||
// When the percentage of stale datanodes reaches this ratio,
|
||||
// allow writing to stale nodes to prevent hotspots.
|
||||
public static final String DFS_NAMENODE_USE_STALE_DATANODE_FOR_WRITE_RATIO_KEY = "dfs.namenode.write.stale.datanode.ratio";
|
||||
public static final float DFS_NAMENODE_USE_STALE_DATANODE_FOR_WRITE_RATIO_DEFAULT = 0.5f;
|
||||
|
||||
|
|
|
@ -140,7 +140,7 @@ public class DFSUtil {
|
|||
/**
|
||||
* Comparator for sorting DataNodeInfo[] based on decommissioned/stale states.
|
||||
* Decommissioned/stale nodes are moved to the end of the array on sorting
|
||||
* with this compartor.
|
||||
* with this comparator.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public static class DecomStaleComparator implements Comparator<DatanodeInfo> {
|
||||
|
@ -150,7 +150,7 @@ public class DFSUtil {
|
|||
* Constructor of DecomStaleComparator
|
||||
*
|
||||
* @param interval
|
||||
* The time invertal for marking datanodes as stale is passed from
|
||||
* The time interval for marking datanodes as stale is passed from
|
||||
* outside, since the interval may be changed dynamically
|
||||
*/
|
||||
public DecomStaleComparator(long interval) {
|
||||
|
|
|
@ -125,14 +125,25 @@ public class DatanodeManager {
|
|||
/** Ask Datanode only up to this many blocks to delete. */
|
||||
final int blockInvalidateLimit;
|
||||
|
||||
/** Whether or not to check stale DataNodes for read/write */
|
||||
private final boolean checkForStaleDataNodes;
|
||||
|
||||
/** The interval for judging stale DataNodes for read/write */
|
||||
private final long staleInterval;
|
||||
|
||||
/** Whether or not to avoid using stale DataNodes for writing */
|
||||
private volatile boolean avoidStaleDataNodesForWrite;
|
||||
/** Whether or not to avoid using stale DataNodes for reading */
|
||||
private final boolean avoidStaleDataNodesForRead;
|
||||
|
||||
/**
|
||||
* Whether or not to avoid using stale DataNodes for writing.
|
||||
* Note that, even if this is configured, the policy may be
|
||||
* temporarily disabled when a high percentage of the nodes
|
||||
* are marked as stale.
|
||||
*/
|
||||
private final boolean avoidStaleDataNodesForWrite;
|
||||
|
||||
/**
|
||||
* When the ratio of stale datanodes reaches this number, stop avoiding
|
||||
* writing to stale datanodes, i.e., continue using stale nodes for writing.
|
||||
*/
|
||||
private final float ratioUseStaleDataNodesForWrite;
|
||||
|
||||
/** The number of stale DataNodes */
|
||||
private volatile int numStaleNodes;
|
||||
|
@ -180,13 +191,22 @@ public class DatanodeManager {
|
|||
LOG.info(DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_KEY
|
||||
+ "=" + this.blockInvalidateLimit);
|
||||
|
||||
checkForStaleDataNodes = conf.getBoolean(
|
||||
DFSConfigKeys.DFS_NAMENODE_CHECK_STALE_DATANODE_KEY,
|
||||
DFSConfigKeys.DFS_NAMENODE_CHECK_STALE_DATANODE_DEFAULT);
|
||||
|
||||
staleInterval = getStaleIntervalFromConf(conf, heartbeatExpireInterval);
|
||||
avoidStaleDataNodesForWrite = getAvoidStaleForWriteFromConf(conf,
|
||||
checkForStaleDataNodes);
|
||||
this.avoidStaleDataNodesForRead = conf.getBoolean(
|
||||
DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_KEY,
|
||||
DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_DEFAULT);
|
||||
this.avoidStaleDataNodesForWrite = conf.getBoolean(
|
||||
DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_KEY,
|
||||
DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_DEFAULT);
|
||||
this.staleInterval = getStaleIntervalFromConf(conf, heartbeatExpireInterval);
|
||||
this.ratioUseStaleDataNodesForWrite = conf.getFloat(
|
||||
DFSConfigKeys.DFS_NAMENODE_USE_STALE_DATANODE_FOR_WRITE_RATIO_KEY,
|
||||
DFSConfigKeys.DFS_NAMENODE_USE_STALE_DATANODE_FOR_WRITE_RATIO_DEFAULT);
|
||||
Preconditions.checkArgument(
|
||||
(ratioUseStaleDataNodesForWrite > 0 &&
|
||||
ratioUseStaleDataNodesForWrite <= 1.0f),
|
||||
DFSConfigKeys.DFS_NAMENODE_USE_STALE_DATANODE_FOR_WRITE_RATIO_KEY +
|
||||
" = '" + ratioUseStaleDataNodesForWrite + "' is invalid. " +
|
||||
"It should be a positive non-zero float value, not greater than 1.0f.");
|
||||
}
|
||||
|
||||
private static long getStaleIntervalFromConf(Configuration conf,
|
||||
|
@ -226,22 +246,6 @@ public class DatanodeManager {
|
|||
return staleInterval;
|
||||
}
|
||||
|
||||
static boolean getAvoidStaleForWriteFromConf(Configuration conf,
|
||||
boolean checkForStale) {
|
||||
boolean avoid = conf.getBoolean(
|
||||
DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_KEY,
|
||||
DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_DEFAULT);
|
||||
boolean avoidStaleDataNodesForWrite = checkForStale && avoid;
|
||||
if (!checkForStale && avoid) {
|
||||
LOG.warn("Cannot set "
|
||||
+ DFSConfigKeys.DFS_NAMENODE_CHECK_STALE_DATANODE_KEY
|
||||
+ " as false while setting "
|
||||
+ DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_KEY
|
||||
+ " as true.");
|
||||
}
|
||||
return avoidStaleDataNodesForWrite;
|
||||
}
|
||||
|
||||
void activate(final Configuration conf) {
|
||||
final DecommissionManager dm = new DecommissionManager(namesystem, blockManager);
|
||||
this.decommissionthread = new Daemon(dm.new Monitor(
|
||||
|
@ -286,7 +290,7 @@ public class DatanodeManager {
|
|||
//sort the blocks
|
||||
final DatanodeDescriptor client = getDatanodeByHost(targethost);
|
||||
|
||||
Comparator<DatanodeInfo> comparator = checkForStaleDataNodes ?
|
||||
Comparator<DatanodeInfo> comparator = avoidStaleDataNodesForRead ?
|
||||
new DFSUtil.DecomStaleComparator(staleInterval) :
|
||||
DFSUtil.DECOM_COMPARATOR;
|
||||
|
||||
|
@ -812,30 +816,18 @@ public class DatanodeManager {
|
|||
/* Getter and Setter for stale DataNodes related attributes */
|
||||
|
||||
/**
|
||||
* @return whether or not to avoid writing to stale datanodes
|
||||
*/
|
||||
public boolean isAvoidingStaleDataNodesForWrite() {
|
||||
return avoidStaleDataNodesForWrite;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the value of {@link DatanodeManager#avoidStaleDataNodesForWrite}.
|
||||
* The HeartbeatManager disable avoidStaleDataNodesForWrite when more than
|
||||
* half of the DataNodes are marked as stale.
|
||||
* Whether stale datanodes should be avoided as targets on the write path.
|
||||
* The result of this function may change if the number of stale datanodes
|
||||
* eclipses a configurable threshold.
|
||||
*
|
||||
* @param avoidStaleDataNodesForWrite
|
||||
* The value to set to
|
||||
* {@link DatanodeManager#avoidStaleDataNodesForWrite}
|
||||
* @return whether stale datanodes should be avoided on the write path
|
||||
*/
|
||||
void setAvoidStaleDataNodesForWrite(boolean avoidStaleDataNodesForWrite) {
|
||||
this.avoidStaleDataNodesForWrite = avoidStaleDataNodesForWrite;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Whether or not to check stale DataNodes for R/W
|
||||
*/
|
||||
boolean isCheckingForStaleDataNodes() {
|
||||
return checkForStaleDataNodes;
|
||||
public boolean shouldAvoidStaleDataNodesForWrite() {
|
||||
// If # stale exceeds maximum staleness ratio, disable stale
|
||||
// datanode avoidance on the write path
|
||||
return avoidStaleDataNodesForWrite &&
|
||||
(numStaleNodes <= heartbeatManager.getLiveDatanodeCount()
|
||||
* ratioUseStaleDataNodesForWrite);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -30,8 +30,6 @@ import org.apache.hadoop.hdfs.server.namenode.Namesystem;
|
|||
import org.apache.hadoop.util.Daemon;
|
||||
import org.apache.hadoop.util.Time;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
|
||||
/**
|
||||
* Manage the heartbeats received from datanodes.
|
||||
* The datanode list and statistics are synchronized
|
||||
|
@ -56,16 +54,7 @@ class HeartbeatManager implements DatanodeStatistics {
|
|||
private final long heartbeatRecheckInterval;
|
||||
/** Heartbeat monitor thread */
|
||||
private final Daemon heartbeatThread = new Daemon(new Monitor());
|
||||
/**
|
||||
* The initial setting of end user which indicates whether or not to avoid
|
||||
* writing to stale datanodes.
|
||||
*/
|
||||
private final boolean initialAvoidWriteStaleNodes;
|
||||
/**
|
||||
* When the ratio of stale datanodes reaches this number, stop avoiding
|
||||
* writing to stale datanodes, i.e., continue using stale nodes for writing.
|
||||
*/
|
||||
private final float ratioUseStaleDataNodesForWrite;
|
||||
|
||||
|
||||
final Namesystem namesystem;
|
||||
final BlockManager blockManager;
|
||||
|
@ -74,30 +63,25 @@ class HeartbeatManager implements DatanodeStatistics {
|
|||
final BlockManager blockManager, final Configuration conf) {
|
||||
this.namesystem = namesystem;
|
||||
this.blockManager = blockManager;
|
||||
boolean checkStaleNodes = conf.getBoolean(
|
||||
DFSConfigKeys.DFS_NAMENODE_CHECK_STALE_DATANODE_KEY,
|
||||
DFSConfigKeys.DFS_NAMENODE_CHECK_STALE_DATANODE_DEFAULT);
|
||||
boolean avoidStaleDataNodesForWrite = conf.getBoolean(
|
||||
DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_KEY,
|
||||
DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_DEFAULT);
|
||||
long recheckInterval = conf.getInt(
|
||||
DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
|
||||
DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_DEFAULT); // 5 min
|
||||
long staleInterval = conf.getLong(
|
||||
DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY,
|
||||
DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT);// 30s
|
||||
this.initialAvoidWriteStaleNodes = DatanodeManager
|
||||
.getAvoidStaleForWriteFromConf(conf, checkStaleNodes);
|
||||
this.ratioUseStaleDataNodesForWrite = conf.getFloat(
|
||||
DFSConfigKeys.DFS_NAMENODE_USE_STALE_DATANODE_FOR_WRITE_RATIO_KEY,
|
||||
DFSConfigKeys.DFS_NAMENODE_USE_STALE_DATANODE_FOR_WRITE_RATIO_DEFAULT);
|
||||
Preconditions.checkArgument(
|
||||
(ratioUseStaleDataNodesForWrite > 0 &&
|
||||
ratioUseStaleDataNodesForWrite <= 1.0f),
|
||||
DFSConfigKeys.DFS_NAMENODE_USE_STALE_DATANODE_FOR_WRITE_RATIO_KEY +
|
||||
" = '" + ratioUseStaleDataNodesForWrite + "' is invalid. " +
|
||||
"It should be a positive non-zero float value, not greater than 1.0f.");
|
||||
|
||||
this.heartbeatRecheckInterval = (checkStaleNodes
|
||||
&& initialAvoidWriteStaleNodes
|
||||
&& staleInterval < recheckInterval) ? staleInterval : recheckInterval;
|
||||
if (avoidStaleDataNodesForWrite && staleInterval < recheckInterval) {
|
||||
this.heartbeatRecheckInterval = staleInterval;
|
||||
LOG.info("Setting heartbeat recheck interval to " + staleInterval
|
||||
+ " since " + DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY
|
||||
+ " is less than "
|
||||
+ DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY);
|
||||
} else {
|
||||
this.heartbeatRecheckInterval = recheckInterval;
|
||||
}
|
||||
}
|
||||
|
||||
void activate(Configuration conf) {
|
||||
|
@ -242,7 +226,6 @@ class HeartbeatManager implements DatanodeStatistics {
|
|||
if (namesystem.isInSafeMode()) {
|
||||
return;
|
||||
}
|
||||
boolean checkStaleNodes = dm.isCheckingForStaleDataNodes();
|
||||
boolean allAlive = false;
|
||||
while (!allAlive) {
|
||||
// locate the first dead node.
|
||||
|
@ -254,29 +237,14 @@ class HeartbeatManager implements DatanodeStatistics {
|
|||
if (dead == null && dm.isDatanodeDead(d)) {
|
||||
stats.incrExpiredHeartbeats();
|
||||
dead = d;
|
||||
if (!checkStaleNodes) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (checkStaleNodes &&
|
||||
d.isStale(dm.getStaleInterval())) {
|
||||
if (d.isStale(dm.getStaleInterval())) {
|
||||
numOfStaleNodes++;
|
||||
}
|
||||
}
|
||||
|
||||
// Change whether to avoid using stale datanodes for writing
|
||||
// based on proportion of stale datanodes
|
||||
if (checkStaleNodes) {
|
||||
dm.setNumStaleNodes(numOfStaleNodes);
|
||||
if (numOfStaleNodes >
|
||||
datanodes.size() * ratioUseStaleDataNodesForWrite) {
|
||||
dm.setAvoidStaleDataNodesForWrite(false);
|
||||
} else {
|
||||
if (this.initialAvoidWriteStaleNodes) {
|
||||
dm.setAvoidStaleDataNodesForWrite(true);
|
||||
}
|
||||
}
|
||||
}
|
||||
// Set the number of stale nodes in the DatanodeManager
|
||||
dm.setNumStaleNodes(numOfStaleNodes);
|
||||
}
|
||||
|
||||
allAlive = dead == null;
|
||||
|
|
|
@ -5571,7 +5571,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|||
@Override
|
||||
public boolean isAvoidingStaleDataNodesForWrite() {
|
||||
return this.blockManager.getDatanodeManager()
|
||||
.isAvoidingStaleDataNodesForWrite();
|
||||
.shouldAvoidStaleDataNodesForWrite();
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -999,17 +999,14 @@
|
|||
</property>
|
||||
|
||||
<property>
|
||||
<name>dfs.namenode.check.stale.datanode</name>
|
||||
<name>dfs.namenode.avoid.read.stale.datanode</name>
|
||||
<value>false</value>
|
||||
<description>
|
||||
Indicate whether or not to check "stale" datanodes whose
|
||||
Indicate whether or not to avoid reading from "stale" datanodes whose
|
||||
heartbeat messages have not been received by the namenode
|
||||
for more than a specified time interval. If this configuration
|
||||
parameter is set as true, the system will keep track
|
||||
of the number of stale datanodes. The stale datanodes will be
|
||||
for more than a specified time interval. Stale datanodes will be
|
||||
moved to the end of the node list returned for reading. See
|
||||
dfs.namenode.avoid.write.stale.datanode for details on how this
|
||||
affects writes.
|
||||
dfs.namenode.avoid.write.stale.datanode for a similar setting for writes.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
|
@ -1017,13 +1014,13 @@
|
|||
<name>dfs.namenode.avoid.write.stale.datanode</name>
|
||||
<value>false</value>
|
||||
<description>
|
||||
Indicate whether or not to avoid writing to "stale" datanodes whose
|
||||
Indicate whether or not to avoid writing to "stale" datanodes whose
|
||||
heartbeat messages have not been received by the namenode
|
||||
for more than a specified time interval. If this configuration
|
||||
parameter and dfs.namenode.check.stale.datanode are both set as true,
|
||||
the writing will avoid using stale datanodes unless a high number
|
||||
of datanodes are marked as stale. See
|
||||
dfs.namenode.write.stale.datanode.ratio for details.
|
||||
for more than a specified time interval. Writes will avoid using
|
||||
stale datanodes unless more than a configured ratio
|
||||
(dfs.namenode.write.stale.datanode.ratio) of datanodes are marked as
|
||||
stale. See dfs.namenode.avoid.read.stale.datanode for a similar setting
|
||||
for reads.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
|
|
|
@ -88,7 +88,7 @@ public class TestGetBlocks {
|
|||
@Test
|
||||
public void testReadSelectNonStaleDatanode() throws Exception {
|
||||
HdfsConfiguration conf = new HdfsConfiguration();
|
||||
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_CHECK_STALE_DATANODE_KEY, true);
|
||||
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_KEY, true);
|
||||
long staleInterval = 30 * 1000 * 60;
|
||||
conf.setLong(DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY,
|
||||
staleInterval);
|
||||
|
|
|
@ -87,9 +87,11 @@ public class TestReplicationPolicy {
|
|||
"test.build.data", "build/test/data"), "dfs/");
|
||||
conf.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY,
|
||||
new File(baseDir, "name").getPath());
|
||||
// Enable the checking for stale datanodes in the beginning
|
||||
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_CHECK_STALE_DATANODE_KEY, true);
|
||||
|
||||
conf.setBoolean(
|
||||
DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_KEY, true);
|
||||
conf.setBoolean(
|
||||
DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_KEY, true);
|
||||
DFSTestUtil.formatNameNode(conf);
|
||||
namenode = new NameNode(conf);
|
||||
|
||||
|
@ -99,6 +101,8 @@ public class TestReplicationPolicy {
|
|||
// construct network topology
|
||||
for (int i=0; i < NUM_OF_DATANODES; i++) {
|
||||
cluster.add(dataNodes[i]);
|
||||
bm.getDatanodeManager().getHeartbeatManager().addDatanode(
|
||||
dataNodes[i]);
|
||||
}
|
||||
for (int i=0; i < NUM_OF_DATANODES; i++) {
|
||||
dataNodes[i].updateHeartbeat(
|
||||
|
@ -392,11 +396,11 @@ public class TestReplicationPolicy {
|
|||
throws Exception {
|
||||
try {
|
||||
namenode.getNamesystem().getBlockManager().getDatanodeManager()
|
||||
.setAvoidStaleDataNodesForWrite(true);
|
||||
.setNumStaleNodes(NUM_OF_DATANODES);
|
||||
testChooseTargetWithMoreThanAvailableNodes();
|
||||
} finally {
|
||||
namenode.getNamesystem().getBlockManager().getDatanodeManager()
|
||||
.setAvoidStaleDataNodesForWrite(false);
|
||||
.setNumStaleNodes(0);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -478,12 +482,12 @@ public class TestReplicationPolicy {
|
|||
|
||||
@Test
|
||||
public void testChooseTargetWithStaleNodes() throws Exception {
|
||||
// Enable avoidng writing to stale datanodes
|
||||
namenode.getNamesystem().getBlockManager().getDatanodeManager()
|
||||
.setAvoidStaleDataNodesForWrite(true);
|
||||
// Set dataNodes[0] as stale
|
||||
dataNodes[0].setLastUpdate(Time.now() - staleInterval - 1);
|
||||
|
||||
namenode.getNamesystem().getBlockManager()
|
||||
.getDatanodeManager().getHeartbeatManager().heartbeatCheck();
|
||||
assertTrue(namenode.getNamesystem().getBlockManager()
|
||||
.getDatanodeManager().shouldAvoidStaleDataNodesForWrite());
|
||||
DatanodeDescriptor[] targets;
|
||||
// We set the datanode[0] as stale, thus should choose datanode[1] since
|
||||
// datanode[1] is on the same rack with datanode[0] (writer)
|
||||
|
@ -502,9 +506,9 @@ public class TestReplicationPolicy {
|
|||
assertFalse(cluster.isOnSameRack(targets[0], dataNodes[0]));
|
||||
|
||||
// reset
|
||||
namenode.getNamesystem().getBlockManager().getDatanodeManager()
|
||||
.setAvoidStaleDataNodesForWrite(false);
|
||||
dataNodes[0].setLastUpdate(Time.now());
|
||||
namenode.getNamesystem().getBlockManager()
|
||||
.getDatanodeManager().getHeartbeatManager().heartbeatCheck();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -517,20 +521,20 @@ public class TestReplicationPolicy {
|
|||
*/
|
||||
@Test
|
||||
public void testChooseTargetWithHalfStaleNodes() throws Exception {
|
||||
// Enable stale datanodes checking
|
||||
namenode.getNamesystem().getBlockManager().getDatanodeManager()
|
||||
.setAvoidStaleDataNodesForWrite(true);
|
||||
// Set dataNodes[0], dataNodes[1], and dataNodes[2] as stale
|
||||
for (int i = 0; i < 3; i++) {
|
||||
dataNodes[i].setLastUpdate(Time.now() - staleInterval - 1);
|
||||
}
|
||||
namenode.getNamesystem().getBlockManager()
|
||||
.getDatanodeManager().getHeartbeatManager().heartbeatCheck();
|
||||
|
||||
DatanodeDescriptor[] targets;
|
||||
targets = replicator.chooseTarget(filename, 0, dataNodes[0],
|
||||
new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
|
||||
assertEquals(targets.length, 0);
|
||||
|
||||
// We set the datanode[0] as stale, thus should choose datanode[1]
|
||||
// Since we have 6 datanodes total, stale nodes should
|
||||
// not be returned until we ask for more than 3 targets
|
||||
targets = replicator.chooseTarget(filename, 1, dataNodes[0],
|
||||
new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
|
||||
assertEquals(targets.length, 1);
|
||||
|
@ -556,18 +560,16 @@ public class TestReplicationPolicy {
|
|||
assertTrue(containsWithinRange(dataNodes[4], targets, 0, 3));
|
||||
assertTrue(containsWithinRange(dataNodes[5], targets, 0, 3));
|
||||
|
||||
// reset
|
||||
namenode.getNamesystem().getBlockManager().getDatanodeManager()
|
||||
.setAvoidStaleDataNodesForWrite(false);
|
||||
for (int i = 0; i < dataNodes.length; i++) {
|
||||
dataNodes[i].setLastUpdate(Time.now());
|
||||
}
|
||||
namenode.getNamesystem().getBlockManager()
|
||||
.getDatanodeManager().getHeartbeatManager().heartbeatCheck();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testChooseTargetWithMoreThanHalfStaleNodes() throws Exception {
|
||||
HdfsConfiguration conf = new HdfsConfiguration();
|
||||
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_CHECK_STALE_DATANODE_KEY, true);
|
||||
conf.setBoolean(
|
||||
DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_KEY, true);
|
||||
String[] hosts = new String[]{"host1", "host2", "host3",
|
||||
|
@ -597,7 +599,7 @@ public class TestReplicationPolicy {
|
|||
.getBlockManager().getDatanodeManager().getNumStaleNodes();
|
||||
assertEquals(numStaleNodes, 2);
|
||||
assertTrue(miniCluster.getNameNode().getNamesystem().getBlockManager()
|
||||
.getDatanodeManager().isAvoidingStaleDataNodesForWrite());
|
||||
.getDatanodeManager().shouldAvoidStaleDataNodesForWrite());
|
||||
// Call chooseTarget
|
||||
DatanodeDescriptor staleNodeInfo = miniCluster.getNameNode()
|
||||
.getNamesystem().getBlockManager().getDatanodeManager()
|
||||
|
@ -626,7 +628,7 @@ public class TestReplicationPolicy {
|
|||
// According to our strategy, stale datanodes will be included for writing
|
||||
// to avoid hotspots
|
||||
assertFalse(miniCluster.getNameNode().getNamesystem().getBlockManager()
|
||||
.getDatanodeManager().isAvoidingStaleDataNodesForWrite());
|
||||
.getDatanodeManager().shouldAvoidStaleDataNodesForWrite());
|
||||
// Call chooseTarget
|
||||
targets = replicator.chooseTarget(filename, 3,
|
||||
staleNodeInfo, new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
|
||||
|
@ -649,7 +651,7 @@ public class TestReplicationPolicy {
|
|||
.getBlockManager().getDatanodeManager().getNumStaleNodes();
|
||||
assertEquals(numStaleNodes, 2);
|
||||
assertTrue(miniCluster.getNameNode().getNamesystem().getBlockManager()
|
||||
.getDatanodeManager().isAvoidingStaleDataNodesForWrite());
|
||||
.getDatanodeManager().shouldAvoidStaleDataNodesForWrite());
|
||||
// Call chooseTarget
|
||||
targets = replicator.chooseTarget(filename, 3,
|
||||
staleNodeInfo, new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
|
||||
|
|
|
@ -82,7 +82,7 @@ public class TestNameNodeMetrics {
|
|||
CONF.set(DFSConfigKeys.DFS_METRICS_PERCENTILES_INTERVALS_KEY,
|
||||
"" + PERCENTILES_INTERVAL);
|
||||
// Enable stale DataNodes checking
|
||||
CONF.setBoolean(DFSConfigKeys.DFS_NAMENODE_CHECK_STALE_DATANODE_KEY, true);
|
||||
CONF.setBoolean(DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_KEY, true);
|
||||
((Log4JLogger)LogFactory.getLog(MetricsAsserts.class))
|
||||
.getLogger().setLevel(Level.DEBUG);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue