HDFS-14968. Add ability to log stale datanodes. Contributed by Ahmed Hussein.
This commit is contained in:
parent
d40d7cc4f9
commit
bd03053ea2
|
@ -483,6 +483,11 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
||||||
// Whether to enable datanode's stale state detection and usage for writes
|
// Whether to enable datanode's stale state detection and usage for writes
|
||||||
public static final String DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_KEY = "dfs.namenode.avoid.write.stale.datanode";
|
public static final String DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_KEY = "dfs.namenode.avoid.write.stale.datanode";
|
||||||
public static final boolean DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_DEFAULT = false;
|
public static final boolean DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_DEFAULT = false;
|
||||||
|
// enable and disable logging datanode staleness. Disabled by default.
|
||||||
|
public static final String DFS_NAMENODE_ENABLE_LOG_STALE_DATANODE_KEY =
|
||||||
|
"dfs.namenode.enable.log.stale.datanode";
|
||||||
|
public static final boolean DFS_NAMENODE_ENABLE_LOG_STALE_DATANODE_DEFAULT =
|
||||||
|
false;
|
||||||
// The default value of the time interval for marking datanodes as stale
|
// The default value of the time interval for marking datanodes as stale
|
||||||
public static final String DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY = "dfs.namenode.stale.datanode.interval";
|
public static final String DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY = "dfs.namenode.stale.datanode.interval";
|
||||||
public static final long DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT = 30 * 1000; // 30s
|
public static final long DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT = 30 * 1000; // 30s
|
||||||
|
|
|
@ -18,8 +18,10 @@
|
||||||
package org.apache.hadoop.hdfs.server.blockmanagement;
|
package org.apache.hadoop.hdfs.server.blockmanagement;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
@ -43,7 +45,15 @@ import com.google.common.annotations.VisibleForTesting;
|
||||||
*/
|
*/
|
||||||
class HeartbeatManager implements DatanodeStatistics {
|
class HeartbeatManager implements DatanodeStatistics {
|
||||||
static final Logger LOG = LoggerFactory.getLogger(HeartbeatManager.class);
|
static final Logger LOG = LoggerFactory.getLogger(HeartbeatManager.class);
|
||||||
|
private static final String REPORT_DELTA_STALE_DN_HEADER =
|
||||||
|
"StaleNodes Report: [New Stale Nodes]: %d";
|
||||||
|
private static final String REPORT_STALE_DN_LINE_ENTRY = "%n\t %s";
|
||||||
|
private static final String REPORT_STALE_DN_LINE_TAIL = ", %s";
|
||||||
|
private static final String REPORT_REMOVE_DEAD_NODE_ENTRY =
|
||||||
|
"StaleNodes Report: [Remove DeadNode]: %s";
|
||||||
|
private static final String REPORT_REMOVE_STALE_NODE_ENTRY =
|
||||||
|
"StaleNodes Report: [Remove StaleNode]: %s";
|
||||||
|
private static final int REPORT_STALE_NODE_NODES_PER_LINE = 10;
|
||||||
/**
|
/**
|
||||||
* Stores a subset of the datanodeMap in DatanodeManager,
|
* Stores a subset of the datanodeMap in DatanodeManager,
|
||||||
* containing nodes that are considered alive.
|
* containing nodes that are considered alive.
|
||||||
|
@ -56,14 +66,19 @@ class HeartbeatManager implements DatanodeStatistics {
|
||||||
/** Statistics, which are synchronized by the heartbeat manager lock. */
|
/** Statistics, which are synchronized by the heartbeat manager lock. */
|
||||||
private final DatanodeStats stats = new DatanodeStats();
|
private final DatanodeStats stats = new DatanodeStats();
|
||||||
|
|
||||||
/** The time period to check for expired datanodes */
|
/** The time period to check for expired datanodes. */
|
||||||
private final long heartbeatRecheckInterval;
|
private final long heartbeatRecheckInterval;
|
||||||
/** Heartbeat monitor thread */
|
/** Heartbeat monitor thread. */
|
||||||
private final Daemon heartbeatThread = new Daemon(new Monitor());
|
private final Daemon heartbeatThread = new Daemon(new Monitor());
|
||||||
private final StopWatch heartbeatStopWatch = new StopWatch();
|
private final StopWatch heartbeatStopWatch = new StopWatch();
|
||||||
|
|
||||||
final Namesystem namesystem;
|
final Namesystem namesystem;
|
||||||
final BlockManager blockManager;
|
final BlockManager blockManager;
|
||||||
|
/** Enable log for datanode staleness. */
|
||||||
|
private final boolean enableLogStaleNodes;
|
||||||
|
|
||||||
|
/** reports for stale datanodes. */
|
||||||
|
private final Set<DatanodeDescriptor> staleDataNodes = new HashSet<>();
|
||||||
|
|
||||||
HeartbeatManager(final Namesystem namesystem,
|
HeartbeatManager(final Namesystem namesystem,
|
||||||
final BlockManager blockManager, final Configuration conf) {
|
final BlockManager blockManager, final Configuration conf) {
|
||||||
|
@ -78,6 +93,9 @@ class HeartbeatManager implements DatanodeStatistics {
|
||||||
long staleInterval = conf.getLong(
|
long staleInterval = conf.getLong(
|
||||||
DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY,
|
DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY,
|
||||||
DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT);// 30s
|
DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT);// 30s
|
||||||
|
enableLogStaleNodes = conf.getBoolean(
|
||||||
|
DFSConfigKeys.DFS_NAMENODE_ENABLE_LOG_STALE_DATANODE_KEY,
|
||||||
|
DFSConfigKeys.DFS_NAMENODE_ENABLE_LOG_STALE_DATANODE_DEFAULT);
|
||||||
|
|
||||||
if (avoidStaleDataNodesForWrite && staleInterval < recheckInterval) {
|
if (avoidStaleDataNodesForWrite && staleInterval < recheckInterval) {
|
||||||
this.heartbeatRecheckInterval = staleInterval;
|
this.heartbeatRecheckInterval = staleInterval;
|
||||||
|
@ -228,6 +246,7 @@ class HeartbeatManager implements DatanodeStatistics {
|
||||||
if (node.isAlive()) {
|
if (node.isAlive()) {
|
||||||
stats.subtract(node);
|
stats.subtract(node);
|
||||||
datanodes.remove(node);
|
datanodes.remove(node);
|
||||||
|
removeNodeFromStaleList(node);
|
||||||
node.setAlive(false);
|
node.setAlive(false);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -323,6 +342,59 @@ class HeartbeatManager implements DatanodeStatistics {
|
||||||
return elapsed + offset > heartbeatRecheckInterval;
|
return elapsed + offset > heartbeatRecheckInterval;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Remove deadNode from StaleNodeList if it exists.
|
||||||
|
* This method assumes that it is called inside a synchronized block.
|
||||||
|
*
|
||||||
|
* @param d node descriptor to be marked as dead.
|
||||||
|
* @return true if the node was already on the stale list.
|
||||||
|
*/
|
||||||
|
private boolean removeNodeFromStaleList(DatanodeDescriptor d) {
|
||||||
|
return removeNodeFromStaleList(d, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Remove node from StaleNodeList if it exists.
|
||||||
|
* If enabled, the log will show whether the node is removed from list because
|
||||||
|
* it is dead or not.
|
||||||
|
* This method assumes that it is called inside a synchronized block.
|
||||||
|
*
|
||||||
|
* @param d node descriptor to be marked as dead.
|
||||||
|
* @param isDead
|
||||||
|
* @return true if the node was already in the stale list.
|
||||||
|
*/
|
||||||
|
private boolean removeNodeFromStaleList(DatanodeDescriptor d,
|
||||||
|
boolean isDead) {
|
||||||
|
boolean result = false;
|
||||||
|
result = staleDataNodes.remove(d);
|
||||||
|
if (enableLogStaleNodes && result) {
|
||||||
|
LOG.info(String.format(isDead ?
|
||||||
|
REPORT_REMOVE_DEAD_NODE_ENTRY : REPORT_REMOVE_STALE_NODE_ENTRY,
|
||||||
|
d));
|
||||||
|
}
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Dump the new stale data nodes added since last heartbeat check.
|
||||||
|
*
|
||||||
|
* @param staleNodes list of datanodes added in the last heartbeat check.
|
||||||
|
*/
|
||||||
|
private void dumpStaleNodes(List<DatanodeDescriptor> staleNodes) {
|
||||||
|
// log nodes detected as stale
|
||||||
|
if (enableLogStaleNodes && (!staleNodes.isEmpty())) {
|
||||||
|
StringBuilder staleLogMSG =
|
||||||
|
new StringBuilder(String.format(REPORT_DELTA_STALE_DN_HEADER,
|
||||||
|
staleNodes.size()));
|
||||||
|
for (int ind = 0; ind < staleNodes.size(); ind++) {
|
||||||
|
String logFormat = (ind % REPORT_STALE_NODE_NODES_PER_LINE == 0) ?
|
||||||
|
REPORT_STALE_DN_LINE_ENTRY : REPORT_STALE_DN_LINE_TAIL;
|
||||||
|
staleLogMSG.append(String.format(logFormat, staleNodes.get(ind)));
|
||||||
|
}
|
||||||
|
LOG.info(staleLogMSG.toString());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Check if there are any expired heartbeats, and if so,
|
* Check if there are any expired heartbeats, and if so,
|
||||||
* whether any blocks have to be re-replicated.
|
* whether any blocks have to be re-replicated.
|
||||||
|
@ -365,9 +437,9 @@ class HeartbeatManager implements DatanodeStatistics {
|
||||||
// locate the first failed storage that isn't on a dead node.
|
// locate the first failed storage that isn't on a dead node.
|
||||||
DatanodeStorageInfo failedStorage = null;
|
DatanodeStorageInfo failedStorage = null;
|
||||||
|
|
||||||
// check the number of stale nodes
|
// check the number of stale storages
|
||||||
int numOfStaleNodes = 0;
|
|
||||||
int numOfStaleStorages = 0;
|
int numOfStaleStorages = 0;
|
||||||
|
List<DatanodeDescriptor> staleNodes = new ArrayList<>();
|
||||||
synchronized(this) {
|
synchronized(this) {
|
||||||
for (DatanodeDescriptor d : datanodes) {
|
for (DatanodeDescriptor d : datanodes) {
|
||||||
// check if an excessive GC pause has occurred
|
// check if an excessive GC pause has occurred
|
||||||
|
@ -377,13 +449,21 @@ class HeartbeatManager implements DatanodeStatistics {
|
||||||
if (dead == null && dm.isDatanodeDead(d)) {
|
if (dead == null && dm.isDatanodeDead(d)) {
|
||||||
stats.incrExpiredHeartbeats();
|
stats.incrExpiredHeartbeats();
|
||||||
dead = d;
|
dead = d;
|
||||||
|
// remove the node from stale list to adjust the stale list size
|
||||||
|
// before setting the stale count of the DatanodeManager
|
||||||
|
removeNodeFromStaleList(d);
|
||||||
|
} else {
|
||||||
|
if (d.isStale(dm.getStaleInterval())) {
|
||||||
|
if (staleDataNodes.add(d)) {
|
||||||
|
// the node is n
|
||||||
|
staleNodes.add(d);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// remove the node if it is no longer stale
|
||||||
|
removeNodeFromStaleList(d, false);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if (d.isStale(dm.getStaleInterval())) {
|
|
||||||
LOG.warn(String.format("Stale datanode {}."
|
|
||||||
+ " No heartbeat received since last {} milliseconds"),
|
|
||||||
d.getName(), dm.getStaleInterval());
|
|
||||||
numOfStaleNodes++;
|
|
||||||
}
|
|
||||||
DatanodeStorageInfo[] storageInfos = d.getStorageInfos();
|
DatanodeStorageInfo[] storageInfos = d.getStorageInfos();
|
||||||
for(DatanodeStorageInfo storageInfo : storageInfos) {
|
for(DatanodeStorageInfo storageInfo : storageInfos) {
|
||||||
if (storageInfo.areBlockContentsStale()) {
|
if (storageInfo.areBlockContentsStale()) {
|
||||||
|
@ -396,18 +476,21 @@ class HeartbeatManager implements DatanodeStatistics {
|
||||||
failedStorage = storageInfo;
|
failedStorage = storageInfo;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Set the number of stale nodes in the DatanodeManager
|
// Set the number of stale nodes in the DatanodeManager
|
||||||
dm.setNumStaleNodes(numOfStaleNodes);
|
dm.setNumStaleNodes(staleDataNodes.size());
|
||||||
dm.setNumStaleStorages(numOfStaleStorages);
|
dm.setNumStaleStorages(numOfStaleStorages);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// log nodes detected as stale since last heartBeat
|
||||||
|
dumpStaleNodes(staleNodes);
|
||||||
|
|
||||||
allAlive = dead == null && failedStorage == null;
|
allAlive = dead == null && failedStorage == null;
|
||||||
if (!allAlive && namesystem.isInStartupSafeMode()) {
|
if (!allAlive && namesystem.isInStartupSafeMode()) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (dead != null) {
|
if (dead != null) {
|
||||||
// acquire the fsnamesystem lock, and then remove the dead node.
|
// acquire the fsnamesystem lock, and then remove the dead node.
|
||||||
namesystem.writeLock();
|
namesystem.writeLock();
|
||||||
|
|
|
@ -2040,6 +2040,14 @@
|
||||||
</description>
|
</description>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<name>dfs.namenode.enable.log.stale.datanode</name>
|
||||||
|
<value>false</value>
|
||||||
|
<description>
|
||||||
|
Enable and disable logging datanode staleness. Disabled by default.
|
||||||
|
</description>
|
||||||
|
</property>
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
<name>dfs.namenode.stale.datanode.interval</name>
|
<name>dfs.namenode.stale.datanode.interval</name>
|
||||||
<value>30000</value>
|
<value>30000</value>
|
||||||
|
|
Loading…
Reference in New Issue