HDFS-4350. Make enabling of stale marking on read and write paths independent. Contributed by Andrew Wang.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1441819 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
61a262757c
commit
8590564dc5
|
@ -317,6 +317,9 @@ Release 2.0.3-alpha - Unreleased
|
||||||
HDFS-4451. hdfs balancer command returns exit code 1 on success instead
|
HDFS-4451. hdfs balancer command returns exit code 1 on success instead
|
||||||
of 0. (Joshua Blatt via suresh)
|
of 0. (Joshua Blatt via suresh)
|
||||||
|
|
||||||
|
HDFS-4350. Make enabling of stale marking on read and write paths
|
||||||
|
independent. (Andrew Wang via suresh)
|
||||||
|
|
||||||
|
|
||||||
NEW FEATURES
|
NEW FEATURES
|
||||||
|
|
||||||
|
|
|
@ -181,10 +181,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
||||||
public static final String DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_KEY = "dfs.datanode.socket.reuse.keepalive";
|
public static final String DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_KEY = "dfs.datanode.socket.reuse.keepalive";
|
||||||
public static final int DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_DEFAULT = 1000;
|
public static final int DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_DEFAULT = 1000;
|
||||||
|
|
||||||
// Whether to enable datanode's stale state detection and usage
|
// Whether to enable datanode's stale state detection and usage for reads
|
||||||
public static final String DFS_NAMENODE_CHECK_STALE_DATANODE_KEY = "dfs.namenode.check.stale.datanode";
|
public static final String DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_KEY = "dfs.namenode.avoid.read.stale.datanode";
|
||||||
public static final boolean DFS_NAMENODE_CHECK_STALE_DATANODE_DEFAULT = false;
|
public static final boolean DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_DEFAULT = false;
|
||||||
// Whether to enable datanode's stale state detection and usage
|
// Whether to enable datanode's stale state detection and usage for writes
|
||||||
public static final String DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_KEY = "dfs.namenode.avoid.write.stale.datanode";
|
public static final String DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_KEY = "dfs.namenode.avoid.write.stale.datanode";
|
||||||
public static final boolean DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_DEFAULT = false;
|
public static final boolean DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_DEFAULT = false;
|
||||||
// The default value of the time interval for marking datanodes as stale
|
// The default value of the time interval for marking datanodes as stale
|
||||||
|
@ -195,8 +195,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
||||||
public static final String DFS_NAMENODE_STALE_DATANODE_MINIMUM_INTERVAL_KEY = "dfs.namenode.stale.datanode.minimum.interval";
|
public static final String DFS_NAMENODE_STALE_DATANODE_MINIMUM_INTERVAL_KEY = "dfs.namenode.stale.datanode.minimum.interval";
|
||||||
public static final int DFS_NAMENODE_STALE_DATANODE_MINIMUM_INTERVAL_DEFAULT = 3; // i.e. min_interval is 3 * heartbeat_interval = 9s
|
public static final int DFS_NAMENODE_STALE_DATANODE_MINIMUM_INTERVAL_DEFAULT = 3; // i.e. min_interval is 3 * heartbeat_interval = 9s
|
||||||
|
|
||||||
// When the number stale datanodes marked as stale reached this certian ratio,
|
// When the percentage of stale datanodes reaches this ratio,
|
||||||
// stop avoiding writing to stale nodes so as to prevent causing hotspots.
|
// allow writing to stale nodes to prevent hotspots.
|
||||||
public static final String DFS_NAMENODE_USE_STALE_DATANODE_FOR_WRITE_RATIO_KEY = "dfs.namenode.write.stale.datanode.ratio";
|
public static final String DFS_NAMENODE_USE_STALE_DATANODE_FOR_WRITE_RATIO_KEY = "dfs.namenode.write.stale.datanode.ratio";
|
||||||
public static final float DFS_NAMENODE_USE_STALE_DATANODE_FOR_WRITE_RATIO_DEFAULT = 0.5f;
|
public static final float DFS_NAMENODE_USE_STALE_DATANODE_FOR_WRITE_RATIO_DEFAULT = 0.5f;
|
||||||
|
|
||||||
|
|
|
@ -134,7 +134,7 @@ public class DFSUtil {
|
||||||
/**
|
/**
|
||||||
* Comparator for sorting DataNodeInfo[] based on decommissioned/stale states.
|
* Comparator for sorting DataNodeInfo[] based on decommissioned/stale states.
|
||||||
* Decommissioned/stale nodes are moved to the end of the array on sorting
|
* Decommissioned/stale nodes are moved to the end of the array on sorting
|
||||||
* with this compartor.
|
* with this comparator.
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public static class DecomStaleComparator implements Comparator<DatanodeInfo> {
|
public static class DecomStaleComparator implements Comparator<DatanodeInfo> {
|
||||||
|
@ -144,7 +144,7 @@ public class DFSUtil {
|
||||||
* Constructor of DecomStaleComparator
|
* Constructor of DecomStaleComparator
|
||||||
*
|
*
|
||||||
* @param interval
|
* @param interval
|
||||||
* The time invertal for marking datanodes as stale is passed from
|
* The time interval for marking datanodes as stale is passed from
|
||||||
* outside, since the interval may be changed dynamically
|
* outside, since the interval may be changed dynamically
|
||||||
*/
|
*/
|
||||||
public DecomStaleComparator(long interval) {
|
public DecomStaleComparator(long interval) {
|
||||||
|
|
|
@ -126,15 +126,26 @@ public class DatanodeManager {
|
||||||
private final long heartbeatExpireInterval;
|
private final long heartbeatExpireInterval;
|
||||||
/** Ask Datanode only up to this many blocks to delete. */
|
/** Ask Datanode only up to this many blocks to delete. */
|
||||||
final int blockInvalidateLimit;
|
final int blockInvalidateLimit;
|
||||||
|
|
||||||
/** Whether or not to check stale DataNodes for read/write */
|
|
||||||
private final boolean checkForStaleDataNodes;
|
|
||||||
|
|
||||||
/** The interval for judging stale DataNodes for read/write */
|
/** The interval for judging stale DataNodes for read/write */
|
||||||
private final long staleInterval;
|
private final long staleInterval;
|
||||||
|
|
||||||
/** Whether or not to avoid using stale DataNodes for writing */
|
/** Whether or not to avoid using stale DataNodes for reading */
|
||||||
private volatile boolean avoidStaleDataNodesForWrite;
|
private final boolean avoidStaleDataNodesForRead;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Whether or not to avoid using stale DataNodes for writing.
|
||||||
|
* Note that, even if this is configured, the policy may be
|
||||||
|
* temporarily disabled when a high percentage of the nodes
|
||||||
|
* are marked as stale.
|
||||||
|
*/
|
||||||
|
private final boolean avoidStaleDataNodesForWrite;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* When the ratio of stale datanodes reaches this number, stop avoiding
|
||||||
|
* writing to stale datanodes, i.e., continue using stale nodes for writing.
|
||||||
|
*/
|
||||||
|
private final float ratioUseStaleDataNodesForWrite;
|
||||||
|
|
||||||
/** The number of stale DataNodes */
|
/** The number of stale DataNodes */
|
||||||
private volatile int numStaleNodes;
|
private volatile int numStaleNodes;
|
||||||
|
@ -183,14 +194,23 @@ public class DatanodeManager {
|
||||||
DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_KEY, blockInvalidateLimit);
|
DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_KEY, blockInvalidateLimit);
|
||||||
LOG.info(DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_KEY
|
LOG.info(DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_KEY
|
||||||
+ "=" + this.blockInvalidateLimit);
|
+ "=" + this.blockInvalidateLimit);
|
||||||
|
|
||||||
checkForStaleDataNodes = conf.getBoolean(
|
|
||||||
DFSConfigKeys.DFS_NAMENODE_CHECK_STALE_DATANODE_KEY,
|
|
||||||
DFSConfigKeys.DFS_NAMENODE_CHECK_STALE_DATANODE_DEFAULT);
|
|
||||||
|
|
||||||
staleInterval = getStaleIntervalFromConf(conf, heartbeatExpireInterval);
|
this.avoidStaleDataNodesForRead = conf.getBoolean(
|
||||||
avoidStaleDataNodesForWrite = getAvoidStaleForWriteFromConf(conf,
|
DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_KEY,
|
||||||
checkForStaleDataNodes);
|
DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_DEFAULT);
|
||||||
|
this.avoidStaleDataNodesForWrite = conf.getBoolean(
|
||||||
|
DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_KEY,
|
||||||
|
DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_DEFAULT);
|
||||||
|
this.staleInterval = getStaleIntervalFromConf(conf, heartbeatExpireInterval);
|
||||||
|
this.ratioUseStaleDataNodesForWrite = conf.getFloat(
|
||||||
|
DFSConfigKeys.DFS_NAMENODE_USE_STALE_DATANODE_FOR_WRITE_RATIO_KEY,
|
||||||
|
DFSConfigKeys.DFS_NAMENODE_USE_STALE_DATANODE_FOR_WRITE_RATIO_DEFAULT);
|
||||||
|
Preconditions.checkArgument(
|
||||||
|
(ratioUseStaleDataNodesForWrite > 0 &&
|
||||||
|
ratioUseStaleDataNodesForWrite <= 1.0f),
|
||||||
|
DFSConfigKeys.DFS_NAMENODE_USE_STALE_DATANODE_FOR_WRITE_RATIO_KEY +
|
||||||
|
" = '" + ratioUseStaleDataNodesForWrite + "' is invalid. " +
|
||||||
|
"It should be a positive non-zero float value, not greater than 1.0f.");
|
||||||
}
|
}
|
||||||
|
|
||||||
private static long getStaleIntervalFromConf(Configuration conf,
|
private static long getStaleIntervalFromConf(Configuration conf,
|
||||||
|
@ -230,22 +250,6 @@ public class DatanodeManager {
|
||||||
return staleInterval;
|
return staleInterval;
|
||||||
}
|
}
|
||||||
|
|
||||||
static boolean getAvoidStaleForWriteFromConf(Configuration conf,
|
|
||||||
boolean checkForStale) {
|
|
||||||
boolean avoid = conf.getBoolean(
|
|
||||||
DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_KEY,
|
|
||||||
DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_DEFAULT);
|
|
||||||
boolean avoidStaleDataNodesForWrite = checkForStale && avoid;
|
|
||||||
if (!checkForStale && avoid) {
|
|
||||||
LOG.warn("Cannot set "
|
|
||||||
+ DFSConfigKeys.DFS_NAMENODE_CHECK_STALE_DATANODE_KEY
|
|
||||||
+ " as false while setting "
|
|
||||||
+ DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_KEY
|
|
||||||
+ " as true.");
|
|
||||||
}
|
|
||||||
return avoidStaleDataNodesForWrite;
|
|
||||||
}
|
|
||||||
|
|
||||||
void activate(final Configuration conf) {
|
void activate(final Configuration conf) {
|
||||||
final DecommissionManager dm = new DecommissionManager(namesystem, blockManager);
|
final DecommissionManager dm = new DecommissionManager(namesystem, blockManager);
|
||||||
this.decommissionthread = new Daemon(dm.new Monitor(
|
this.decommissionthread = new Daemon(dm.new Monitor(
|
||||||
|
@ -299,7 +303,7 @@ public class DatanodeManager {
|
||||||
client = new NodeBase(rName + NodeBase.PATH_SEPARATOR_STR + targethost);
|
client = new NodeBase(rName + NodeBase.PATH_SEPARATOR_STR + targethost);
|
||||||
}
|
}
|
||||||
|
|
||||||
Comparator<DatanodeInfo> comparator = checkForStaleDataNodes ?
|
Comparator<DatanodeInfo> comparator = avoidStaleDataNodesForRead ?
|
||||||
new DFSUtil.DecomStaleComparator(staleInterval) :
|
new DFSUtil.DecomStaleComparator(staleInterval) :
|
||||||
DFSUtil.DECOM_COMPARATOR;
|
DFSUtil.DECOM_COMPARATOR;
|
||||||
|
|
||||||
|
@ -825,32 +829,20 @@ public class DatanodeManager {
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Getter and Setter for stale DataNodes related attributes */
|
/* Getter and Setter for stale DataNodes related attributes */
|
||||||
|
|
||||||
/**
|
|
||||||
* @return whether or not to avoid writing to stale datanodes
|
|
||||||
*/
|
|
||||||
public boolean isAvoidingStaleDataNodesForWrite() {
|
|
||||||
return avoidStaleDataNodesForWrite;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Set the value of {@link DatanodeManager#avoidStaleDataNodesForWrite}.
|
* Whether stale datanodes should be avoided as targets on the write path.
|
||||||
* The HeartbeatManager disable avoidStaleDataNodesForWrite when more than
|
* The result of this function may change if the number of stale datanodes
|
||||||
* half of the DataNodes are marked as stale.
|
* eclipses a configurable threshold.
|
||||||
*
|
*
|
||||||
* @param avoidStaleDataNodesForWrite
|
* @return whether stale datanodes should be avoided on the write path
|
||||||
* The value to set to
|
|
||||||
* {@link DatanodeManager#avoidStaleDataNodesForWrite}
|
|
||||||
*/
|
*/
|
||||||
void setAvoidStaleDataNodesForWrite(boolean avoidStaleDataNodesForWrite) {
|
public boolean shouldAvoidStaleDataNodesForWrite() {
|
||||||
this.avoidStaleDataNodesForWrite = avoidStaleDataNodesForWrite;
|
// If # stale exceeds maximum staleness ratio, disable stale
|
||||||
}
|
// datanode avoidance on the write path
|
||||||
|
return avoidStaleDataNodesForWrite &&
|
||||||
/**
|
(numStaleNodes <= heartbeatManager.getLiveDatanodeCount()
|
||||||
* @return Whether or not to check stale DataNodes for R/W
|
* ratioUseStaleDataNodesForWrite);
|
||||||
*/
|
|
||||||
boolean isCheckingForStaleDataNodes() {
|
|
||||||
return checkForStaleDataNodes;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -30,8 +30,6 @@ import org.apache.hadoop.hdfs.server.namenode.Namesystem;
|
||||||
import org.apache.hadoop.util.Daemon;
|
import org.apache.hadoop.util.Daemon;
|
||||||
import org.apache.hadoop.util.Time;
|
import org.apache.hadoop.util.Time;
|
||||||
|
|
||||||
import com.google.common.base.Preconditions;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Manage the heartbeats received from datanodes.
|
* Manage the heartbeats received from datanodes.
|
||||||
* The datanode list and statistics are synchronized
|
* The datanode list and statistics are synchronized
|
||||||
|
@ -56,16 +54,7 @@ class HeartbeatManager implements DatanodeStatistics {
|
||||||
private final long heartbeatRecheckInterval;
|
private final long heartbeatRecheckInterval;
|
||||||
/** Heartbeat monitor thread */
|
/** Heartbeat monitor thread */
|
||||||
private final Daemon heartbeatThread = new Daemon(new Monitor());
|
private final Daemon heartbeatThread = new Daemon(new Monitor());
|
||||||
/**
|
|
||||||
* The initial setting of end user which indicates whether or not to avoid
|
|
||||||
* writing to stale datanodes.
|
|
||||||
*/
|
|
||||||
private final boolean initialAvoidWriteStaleNodes;
|
|
||||||
/**
|
|
||||||
* When the ratio of stale datanodes reaches this number, stop avoiding
|
|
||||||
* writing to stale datanodes, i.e., continue using stale nodes for writing.
|
|
||||||
*/
|
|
||||||
private final float ratioUseStaleDataNodesForWrite;
|
|
||||||
|
|
||||||
final Namesystem namesystem;
|
final Namesystem namesystem;
|
||||||
final BlockManager blockManager;
|
final BlockManager blockManager;
|
||||||
|
@ -74,30 +63,25 @@ class HeartbeatManager implements DatanodeStatistics {
|
||||||
final BlockManager blockManager, final Configuration conf) {
|
final BlockManager blockManager, final Configuration conf) {
|
||||||
this.namesystem = namesystem;
|
this.namesystem = namesystem;
|
||||||
this.blockManager = blockManager;
|
this.blockManager = blockManager;
|
||||||
boolean checkStaleNodes = conf.getBoolean(
|
boolean avoidStaleDataNodesForWrite = conf.getBoolean(
|
||||||
DFSConfigKeys.DFS_NAMENODE_CHECK_STALE_DATANODE_KEY,
|
DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_KEY,
|
||||||
DFSConfigKeys.DFS_NAMENODE_CHECK_STALE_DATANODE_DEFAULT);
|
DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_DEFAULT);
|
||||||
long recheckInterval = conf.getInt(
|
long recheckInterval = conf.getInt(
|
||||||
DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
|
DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
|
||||||
DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_DEFAULT); // 5 min
|
DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_DEFAULT); // 5 min
|
||||||
long staleInterval = conf.getLong(
|
long staleInterval = conf.getLong(
|
||||||
DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY,
|
DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY,
|
||||||
DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT);// 30s
|
DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT);// 30s
|
||||||
this.initialAvoidWriteStaleNodes = DatanodeManager
|
|
||||||
.getAvoidStaleForWriteFromConf(conf, checkStaleNodes);
|
if (avoidStaleDataNodesForWrite && staleInterval < recheckInterval) {
|
||||||
this.ratioUseStaleDataNodesForWrite = conf.getFloat(
|
this.heartbeatRecheckInterval = staleInterval;
|
||||||
DFSConfigKeys.DFS_NAMENODE_USE_STALE_DATANODE_FOR_WRITE_RATIO_KEY,
|
LOG.info("Setting heartbeat recheck interval to " + staleInterval
|
||||||
DFSConfigKeys.DFS_NAMENODE_USE_STALE_DATANODE_FOR_WRITE_RATIO_DEFAULT);
|
+ " since " + DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY
|
||||||
Preconditions.checkArgument(
|
+ " is less than "
|
||||||
(ratioUseStaleDataNodesForWrite > 0 &&
|
+ DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY);
|
||||||
ratioUseStaleDataNodesForWrite <= 1.0f),
|
} else {
|
||||||
DFSConfigKeys.DFS_NAMENODE_USE_STALE_DATANODE_FOR_WRITE_RATIO_KEY +
|
this.heartbeatRecheckInterval = recheckInterval;
|
||||||
" = '" + ratioUseStaleDataNodesForWrite + "' is invalid. " +
|
}
|
||||||
"It should be a positive non-zero float value, not greater than 1.0f.");
|
|
||||||
|
|
||||||
this.heartbeatRecheckInterval = (checkStaleNodes
|
|
||||||
&& initialAvoidWriteStaleNodes
|
|
||||||
&& staleInterval < recheckInterval) ? staleInterval : recheckInterval;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void activate(Configuration conf) {
|
void activate(Configuration conf) {
|
||||||
|
@ -242,7 +226,6 @@ class HeartbeatManager implements DatanodeStatistics {
|
||||||
if (namesystem.isInSafeMode()) {
|
if (namesystem.isInSafeMode()) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
boolean checkStaleNodes = dm.isCheckingForStaleDataNodes();
|
|
||||||
boolean allAlive = false;
|
boolean allAlive = false;
|
||||||
while (!allAlive) {
|
while (!allAlive) {
|
||||||
// locate the first dead node.
|
// locate the first dead node.
|
||||||
|
@ -254,29 +237,14 @@ class HeartbeatManager implements DatanodeStatistics {
|
||||||
if (dead == null && dm.isDatanodeDead(d)) {
|
if (dead == null && dm.isDatanodeDead(d)) {
|
||||||
stats.incrExpiredHeartbeats();
|
stats.incrExpiredHeartbeats();
|
||||||
dead = d;
|
dead = d;
|
||||||
if (!checkStaleNodes) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
if (checkStaleNodes &&
|
if (d.isStale(dm.getStaleInterval())) {
|
||||||
d.isStale(dm.getStaleInterval())) {
|
|
||||||
numOfStaleNodes++;
|
numOfStaleNodes++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Change whether to avoid using stale datanodes for writing
|
// Set the number of stale nodes in the DatanodeManager
|
||||||
// based on proportion of stale datanodes
|
dm.setNumStaleNodes(numOfStaleNodes);
|
||||||
if (checkStaleNodes) {
|
|
||||||
dm.setNumStaleNodes(numOfStaleNodes);
|
|
||||||
if (numOfStaleNodes >
|
|
||||||
datanodes.size() * ratioUseStaleDataNodesForWrite) {
|
|
||||||
dm.setAvoidStaleDataNodesForWrite(false);
|
|
||||||
} else {
|
|
||||||
if (this.initialAvoidWriteStaleNodes) {
|
|
||||||
dm.setAvoidStaleDataNodesForWrite(true);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
allAlive = dead == null;
|
allAlive = dead == null;
|
||||||
|
|
|
@ -5636,7 +5636,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
||||||
@Override
|
@Override
|
||||||
public boolean isAvoidingStaleDataNodesForWrite() {
|
public boolean isAvoidingStaleDataNodesForWrite() {
|
||||||
return this.blockManager.getDatanodeManager()
|
return this.blockManager.getDatanodeManager()
|
||||||
.isAvoidingStaleDataNodesForWrite();
|
.shouldAvoidStaleDataNodesForWrite();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -999,17 +999,14 @@
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
<name>dfs.namenode.check.stale.datanode</name>
|
<name>dfs.namenode.avoid.read.stale.datanode</name>
|
||||||
<value>false</value>
|
<value>false</value>
|
||||||
<description>
|
<description>
|
||||||
Indicate whether or not to check "stale" datanodes whose
|
Indicate whether or not to avoid reading from "stale" datanodes whose
|
||||||
heartbeat messages have not been received by the namenode
|
heartbeat messages have not been received by the namenode
|
||||||
for more than a specified time interval. If this configuration
|
for more than a specified time interval. Stale datanodes will be
|
||||||
parameter is set as true, the system will keep track
|
|
||||||
of the number of stale datanodes. The stale datanodes will be
|
|
||||||
moved to the end of the node list returned for reading. See
|
moved to the end of the node list returned for reading. See
|
||||||
dfs.namenode.avoid.write.stale.datanode for details on how this
|
dfs.namenode.avoid.write.stale.datanode for a similar setting for writes.
|
||||||
affects writes.
|
|
||||||
</description>
|
</description>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
|
@ -1017,13 +1014,13 @@
|
||||||
<name>dfs.namenode.avoid.write.stale.datanode</name>
|
<name>dfs.namenode.avoid.write.stale.datanode</name>
|
||||||
<value>false</value>
|
<value>false</value>
|
||||||
<description>
|
<description>
|
||||||
Indicate whether or not to avoid writing to "stale" datanodes whose
|
Indicate whether or not to avoid writing to "stale" datanodes whose
|
||||||
heartbeat messages have not been received by the namenode
|
heartbeat messages have not been received by the namenode
|
||||||
for more than a specified time interval. If this configuration
|
for more than a specified time interval. Writes will avoid using
|
||||||
parameter and dfs.namenode.check.stale.datanode are both set as true,
|
stale datanodes unless more than a configured ratio
|
||||||
the writing will avoid using stale datanodes unless a high number
|
(dfs.namenode.write.stale.datanode.ratio) of datanodes are marked as
|
||||||
of datanodes are marked as stale. See
|
stale. See dfs.namenode.avoid.read.stale.datanode for a similar setting
|
||||||
dfs.namenode.write.stale.datanode.ratio for details.
|
for reads.
|
||||||
</description>
|
</description>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
|
|
|
@ -88,7 +88,7 @@ public class TestGetBlocks {
|
||||||
@Test
|
@Test
|
||||||
public void testReadSelectNonStaleDatanode() throws Exception {
|
public void testReadSelectNonStaleDatanode() throws Exception {
|
||||||
HdfsConfiguration conf = new HdfsConfiguration();
|
HdfsConfiguration conf = new HdfsConfiguration();
|
||||||
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_CHECK_STALE_DATANODE_KEY, true);
|
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_KEY, true);
|
||||||
long staleInterval = 30 * 1000 * 60;
|
long staleInterval = 30 * 1000 * 60;
|
||||||
conf.setLong(DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY,
|
conf.setLong(DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY,
|
||||||
staleInterval);
|
staleInterval);
|
||||||
|
|
|
@ -88,9 +88,11 @@ public class TestReplicationPolicy {
|
||||||
"test.build.data", "build/test/data"), "dfs/");
|
"test.build.data", "build/test/data"), "dfs/");
|
||||||
conf.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY,
|
conf.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY,
|
||||||
new File(baseDir, "name").getPath());
|
new File(baseDir, "name").getPath());
|
||||||
// Enable the checking for stale datanodes in the beginning
|
|
||||||
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_CHECK_STALE_DATANODE_KEY, true);
|
|
||||||
|
|
||||||
|
conf.setBoolean(
|
||||||
|
DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_KEY, true);
|
||||||
|
conf.setBoolean(
|
||||||
|
DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_KEY, true);
|
||||||
DFSTestUtil.formatNameNode(conf);
|
DFSTestUtil.formatNameNode(conf);
|
||||||
namenode = new NameNode(conf);
|
namenode = new NameNode(conf);
|
||||||
|
|
||||||
|
@ -100,6 +102,8 @@ public class TestReplicationPolicy {
|
||||||
// construct network topology
|
// construct network topology
|
||||||
for (int i=0; i < NUM_OF_DATANODES; i++) {
|
for (int i=0; i < NUM_OF_DATANODES; i++) {
|
||||||
cluster.add(dataNodes[i]);
|
cluster.add(dataNodes[i]);
|
||||||
|
bm.getDatanodeManager().getHeartbeatManager().addDatanode(
|
||||||
|
dataNodes[i]);
|
||||||
}
|
}
|
||||||
for (int i=0; i < NUM_OF_DATANODES; i++) {
|
for (int i=0; i < NUM_OF_DATANODES; i++) {
|
||||||
dataNodes[i].updateHeartbeat(
|
dataNodes[i].updateHeartbeat(
|
||||||
|
@ -393,11 +397,11 @@ public class TestReplicationPolicy {
|
||||||
throws Exception {
|
throws Exception {
|
||||||
try {
|
try {
|
||||||
namenode.getNamesystem().getBlockManager().getDatanodeManager()
|
namenode.getNamesystem().getBlockManager().getDatanodeManager()
|
||||||
.setAvoidStaleDataNodesForWrite(true);
|
.setNumStaleNodes(NUM_OF_DATANODES);
|
||||||
testChooseTargetWithMoreThanAvailableNodes();
|
testChooseTargetWithMoreThanAvailableNodes();
|
||||||
} finally {
|
} finally {
|
||||||
namenode.getNamesystem().getBlockManager().getDatanodeManager()
|
namenode.getNamesystem().getBlockManager().getDatanodeManager()
|
||||||
.setAvoidStaleDataNodesForWrite(false);
|
.setNumStaleNodes(0);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -479,12 +483,12 @@ public class TestReplicationPolicy {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testChooseTargetWithStaleNodes() throws Exception {
|
public void testChooseTargetWithStaleNodes() throws Exception {
|
||||||
// Enable avoidng writing to stale datanodes
|
|
||||||
namenode.getNamesystem().getBlockManager().getDatanodeManager()
|
|
||||||
.setAvoidStaleDataNodesForWrite(true);
|
|
||||||
// Set dataNodes[0] as stale
|
// Set dataNodes[0] as stale
|
||||||
dataNodes[0].setLastUpdate(Time.now() - staleInterval - 1);
|
dataNodes[0].setLastUpdate(Time.now() - staleInterval - 1);
|
||||||
|
namenode.getNamesystem().getBlockManager()
|
||||||
|
.getDatanodeManager().getHeartbeatManager().heartbeatCheck();
|
||||||
|
assertTrue(namenode.getNamesystem().getBlockManager()
|
||||||
|
.getDatanodeManager().shouldAvoidStaleDataNodesForWrite());
|
||||||
DatanodeDescriptor[] targets;
|
DatanodeDescriptor[] targets;
|
||||||
// We set the datanode[0] as stale, thus should choose datanode[1] since
|
// We set the datanode[0] as stale, thus should choose datanode[1] since
|
||||||
// datanode[1] is on the same rack with datanode[0] (writer)
|
// datanode[1] is on the same rack with datanode[0] (writer)
|
||||||
|
@ -503,9 +507,9 @@ public class TestReplicationPolicy {
|
||||||
assertFalse(cluster.isOnSameRack(targets[0], dataNodes[0]));
|
assertFalse(cluster.isOnSameRack(targets[0], dataNodes[0]));
|
||||||
|
|
||||||
// reset
|
// reset
|
||||||
namenode.getNamesystem().getBlockManager().getDatanodeManager()
|
|
||||||
.setAvoidStaleDataNodesForWrite(false);
|
|
||||||
dataNodes[0].setLastUpdate(Time.now());
|
dataNodes[0].setLastUpdate(Time.now());
|
||||||
|
namenode.getNamesystem().getBlockManager()
|
||||||
|
.getDatanodeManager().getHeartbeatManager().heartbeatCheck();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -518,20 +522,20 @@ public class TestReplicationPolicy {
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testChooseTargetWithHalfStaleNodes() throws Exception {
|
public void testChooseTargetWithHalfStaleNodes() throws Exception {
|
||||||
// Enable stale datanodes checking
|
|
||||||
namenode.getNamesystem().getBlockManager().getDatanodeManager()
|
|
||||||
.setAvoidStaleDataNodesForWrite(true);
|
|
||||||
// Set dataNodes[0], dataNodes[1], and dataNodes[2] as stale
|
// Set dataNodes[0], dataNodes[1], and dataNodes[2] as stale
|
||||||
for (int i = 0; i < 3; i++) {
|
for (int i = 0; i < 3; i++) {
|
||||||
dataNodes[i].setLastUpdate(Time.now() - staleInterval - 1);
|
dataNodes[i].setLastUpdate(Time.now() - staleInterval - 1);
|
||||||
}
|
}
|
||||||
|
namenode.getNamesystem().getBlockManager()
|
||||||
|
.getDatanodeManager().getHeartbeatManager().heartbeatCheck();
|
||||||
|
|
||||||
DatanodeDescriptor[] targets;
|
DatanodeDescriptor[] targets;
|
||||||
targets = replicator.chooseTarget(filename, 0, dataNodes[0],
|
targets = replicator.chooseTarget(filename, 0, dataNodes[0],
|
||||||
new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
|
new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
|
||||||
assertEquals(targets.length, 0);
|
assertEquals(targets.length, 0);
|
||||||
|
|
||||||
// We set the datanode[0] as stale, thus should choose datanode[1]
|
// Since we have 6 datanodes total, stale nodes should
|
||||||
|
// not be returned until we ask for more than 3 targets
|
||||||
targets = replicator.chooseTarget(filename, 1, dataNodes[0],
|
targets = replicator.chooseTarget(filename, 1, dataNodes[0],
|
||||||
new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
|
new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
|
||||||
assertEquals(targets.length, 1);
|
assertEquals(targets.length, 1);
|
||||||
|
@ -557,18 +561,16 @@ public class TestReplicationPolicy {
|
||||||
assertTrue(containsWithinRange(dataNodes[4], targets, 0, 3));
|
assertTrue(containsWithinRange(dataNodes[4], targets, 0, 3));
|
||||||
assertTrue(containsWithinRange(dataNodes[5], targets, 0, 3));
|
assertTrue(containsWithinRange(dataNodes[5], targets, 0, 3));
|
||||||
|
|
||||||
// reset
|
|
||||||
namenode.getNamesystem().getBlockManager().getDatanodeManager()
|
|
||||||
.setAvoidStaleDataNodesForWrite(false);
|
|
||||||
for (int i = 0; i < dataNodes.length; i++) {
|
for (int i = 0; i < dataNodes.length; i++) {
|
||||||
dataNodes[i].setLastUpdate(Time.now());
|
dataNodes[i].setLastUpdate(Time.now());
|
||||||
}
|
}
|
||||||
|
namenode.getNamesystem().getBlockManager()
|
||||||
|
.getDatanodeManager().getHeartbeatManager().heartbeatCheck();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testChooseTargetWithMoreThanHalfStaleNodes() throws Exception {
|
public void testChooseTargetWithMoreThanHalfStaleNodes() throws Exception {
|
||||||
HdfsConfiguration conf = new HdfsConfiguration();
|
HdfsConfiguration conf = new HdfsConfiguration();
|
||||||
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_CHECK_STALE_DATANODE_KEY, true);
|
|
||||||
conf.setBoolean(
|
conf.setBoolean(
|
||||||
DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_KEY, true);
|
DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_KEY, true);
|
||||||
String[] hosts = new String[]{"host1", "host2", "host3",
|
String[] hosts = new String[]{"host1", "host2", "host3",
|
||||||
|
@ -598,7 +600,7 @@ public class TestReplicationPolicy {
|
||||||
.getBlockManager().getDatanodeManager().getNumStaleNodes();
|
.getBlockManager().getDatanodeManager().getNumStaleNodes();
|
||||||
assertEquals(numStaleNodes, 2);
|
assertEquals(numStaleNodes, 2);
|
||||||
assertTrue(miniCluster.getNameNode().getNamesystem().getBlockManager()
|
assertTrue(miniCluster.getNameNode().getNamesystem().getBlockManager()
|
||||||
.getDatanodeManager().isAvoidingStaleDataNodesForWrite());
|
.getDatanodeManager().shouldAvoidStaleDataNodesForWrite());
|
||||||
// Call chooseTarget
|
// Call chooseTarget
|
||||||
DatanodeDescriptor staleNodeInfo = miniCluster.getNameNode()
|
DatanodeDescriptor staleNodeInfo = miniCluster.getNameNode()
|
||||||
.getNamesystem().getBlockManager().getDatanodeManager()
|
.getNamesystem().getBlockManager().getDatanodeManager()
|
||||||
|
@ -627,7 +629,7 @@ public class TestReplicationPolicy {
|
||||||
// According to our strategy, stale datanodes will be included for writing
|
// According to our strategy, stale datanodes will be included for writing
|
||||||
// to avoid hotspots
|
// to avoid hotspots
|
||||||
assertFalse(miniCluster.getNameNode().getNamesystem().getBlockManager()
|
assertFalse(miniCluster.getNameNode().getNamesystem().getBlockManager()
|
||||||
.getDatanodeManager().isAvoidingStaleDataNodesForWrite());
|
.getDatanodeManager().shouldAvoidStaleDataNodesForWrite());
|
||||||
// Call chooseTarget
|
// Call chooseTarget
|
||||||
targets = replicator.chooseTarget(filename, 3,
|
targets = replicator.chooseTarget(filename, 3,
|
||||||
staleNodeInfo, new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
|
staleNodeInfo, new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
|
||||||
|
@ -650,7 +652,7 @@ public class TestReplicationPolicy {
|
||||||
.getBlockManager().getDatanodeManager().getNumStaleNodes();
|
.getBlockManager().getDatanodeManager().getNumStaleNodes();
|
||||||
assertEquals(numStaleNodes, 2);
|
assertEquals(numStaleNodes, 2);
|
||||||
assertTrue(miniCluster.getNameNode().getNamesystem().getBlockManager()
|
assertTrue(miniCluster.getNameNode().getNamesystem().getBlockManager()
|
||||||
.getDatanodeManager().isAvoidingStaleDataNodesForWrite());
|
.getDatanodeManager().shouldAvoidStaleDataNodesForWrite());
|
||||||
// Call chooseTarget
|
// Call chooseTarget
|
||||||
targets = replicator.chooseTarget(filename, 3,
|
targets = replicator.chooseTarget(filename, 3,
|
||||||
staleNodeInfo, new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
|
staleNodeInfo, new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
|
||||||
|
|
|
@ -82,7 +82,7 @@ public class TestNameNodeMetrics {
|
||||||
CONF.set(DFSConfigKeys.DFS_METRICS_PERCENTILES_INTERVALS_KEY,
|
CONF.set(DFSConfigKeys.DFS_METRICS_PERCENTILES_INTERVALS_KEY,
|
||||||
"" + PERCENTILES_INTERVAL);
|
"" + PERCENTILES_INTERVAL);
|
||||||
// Enable stale DataNodes checking
|
// Enable stale DataNodes checking
|
||||||
CONF.setBoolean(DFSConfigKeys.DFS_NAMENODE_CHECK_STALE_DATANODE_KEY, true);
|
CONF.setBoolean(DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_KEY, true);
|
||||||
((Log4JLogger)LogFactory.getLog(MetricsAsserts.class))
|
((Log4JLogger)LogFactory.getLog(MetricsAsserts.class))
|
||||||
.getLogger().setLevel(Level.DEBUG);
|
.getLogger().setLevel(Level.DEBUG);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue