HDFS-6772. Get DN storages out of blockContentsStale state faster after NN restarts. (Contributed by Ming Ma)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1616680 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
be9c67930b
commit
d3a2fe2807
|
@ -379,6 +379,9 @@ Release 2.6.0 - UNRELEASED
|
|||
HDFS-6722. Display readable last contact time for dead nodes on NN webUI.
|
||||
(Ming Ma via wheat9)
|
||||
|
||||
HDFS-6772. Get DN storages out of blockContentsStale state faster after
|
||||
NN restarts. (Ming Ma via Arpit Agarwal)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
HDFS-6690. Deduplicate xattr names in memory. (wang)
|
||||
|
|
|
@ -136,6 +136,9 @@ public class DatanodeManager {
|
|||
/** The number of stale DataNodes */
|
||||
private volatile int numStaleNodes;
|
||||
|
||||
/** The number of stale storages */
|
||||
private volatile int numStaleStorages;
|
||||
|
||||
/**
|
||||
* Whether or not this cluster has ever consisted of more than 1 rack,
|
||||
* according to the NetworkTopology.
|
||||
|
@ -1142,6 +1145,22 @@ public class DatanodeManager {
|
|||
return this.numStaleNodes;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the number of content stale storages.
|
||||
*/
|
||||
public int getNumStaleStorages() {
|
||||
return numStaleStorages;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the number of content stale storages.
|
||||
*
|
||||
* @param numStaleStorages The number of content stale storages.
|
||||
*/
|
||||
void setNumStaleStorages(int numStaleStorages) {
|
||||
this.numStaleStorages = numStaleStorages;
|
||||
}
|
||||
|
||||
/** Fetch live and dead datanodes. */
|
||||
public void fetchDatanodes(final List<DatanodeDescriptor> live,
|
||||
final List<DatanodeDescriptor> dead, final boolean removeDecommissionNode) {
|
||||
|
|
|
@ -256,6 +256,7 @@ class HeartbeatManager implements DatanodeStatistics {
|
|||
DatanodeID dead = null;
|
||||
// check the number of stale nodes
|
||||
int numOfStaleNodes = 0;
|
||||
int numOfStaleStorages = 0;
|
||||
synchronized(this) {
|
||||
for (DatanodeDescriptor d : datanodes) {
|
||||
if (dead == null && dm.isDatanodeDead(d)) {
|
||||
|
@ -265,10 +266,17 @@ class HeartbeatManager implements DatanodeStatistics {
|
|||
if (d.isStale(dm.getStaleInterval())) {
|
||||
numOfStaleNodes++;
|
||||
}
|
||||
DatanodeStorageInfo[] storageInfos = d.getStorageInfos();
|
||||
for(DatanodeStorageInfo storageInfo : storageInfos) {
|
||||
if (storageInfo.areBlockContentsStale()) {
|
||||
numOfStaleStorages++;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Set the number of stale nodes in the DatanodeManager
|
||||
dm.setNumStaleNodes(numOfStaleNodes);
|
||||
dm.setNumStaleStorages(numOfStaleStorages);
|
||||
}
|
||||
|
||||
allAlive = dead == null;
|
||||
|
|
|
@ -601,7 +601,7 @@ class BPOfferService {
|
|||
LOG.info("DatanodeCommand action : DNA_REGISTER from " + actor.nnAddr
|
||||
+ " with " + actor.state + " state");
|
||||
actor.reRegister();
|
||||
return true;
|
||||
return false;
|
||||
}
|
||||
writeLock();
|
||||
try {
|
||||
|
|
|
@ -223,6 +223,18 @@ class BPServiceActor implements Runnable {
|
|||
register();
|
||||
}
|
||||
|
||||
// This is useful to make sure NN gets Heartbeat before Blockreport
|
||||
// upon NN restart while DN keeps retrying Otherwise,
|
||||
// 1. NN restarts.
|
||||
// 2. Heartbeat RPC will retry and succeed. NN asks DN to reregister.
|
||||
// 3. After reregistration completes, DN will send Blockreport first.
|
||||
// 4. Given NN receives Blockreport after Heartbeat, it won't mark
|
||||
// DatanodeStorageInfo#blockContentsStale to false until the next
|
||||
// Blockreport.
|
||||
void scheduleHeartbeat() {
|
||||
lastHeartbeat = 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* This methods arranges for the data node to send the block report at
|
||||
* the next heartbeat.
|
||||
|
@ -902,6 +914,7 @@ class BPServiceActor implements Runnable {
|
|||
retrieveNamespaceInfo();
|
||||
// and re-register
|
||||
register();
|
||||
scheduleHeartbeat();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -6089,7 +6089,6 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
@Override // FSNamesystemMBean
|
||||
public int getNumLiveDataNodes() {
|
||||
return getBlockManager().getDatanodeManager().getNumLiveDataNodes();
|
||||
|
@ -6135,6 +6134,15 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|||
return getBlockManager().getDatanodeManager().getNumStaleNodes();
|
||||
}
|
||||
|
||||
/**
|
||||
* Storages are marked as "content stale" after NN restart or fails over and
|
||||
* before NN receives the first Heartbeat followed by the first Blockreport.
|
||||
*/
|
||||
@Override // FSNamesystemMBean
|
||||
public int getNumStaleStorages() {
|
||||
return getBlockManager().getDatanodeManager().getNumStaleStorages();
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the current generation stamp for legacy blocks
|
||||
*/
|
||||
|
|
|
@ -151,4 +151,11 @@ public interface FSNamesystemMBean {
|
|||
* @return number of blocks pending deletion
|
||||
*/
|
||||
long getPendingDeletionBlocks();
|
||||
|
||||
/**
|
||||
* Number of content stale storages.
|
||||
* @return number of content stale storages
|
||||
*/
|
||||
public int getNumStaleStorages();
|
||||
|
||||
}
|
||||
|
|
|
@ -22,6 +22,9 @@ import org.apache.hadoop.classification.InterfaceStability;
|
|||
|
||||
/**
|
||||
* A BlockCommand is an instruction to a datanode to register with the namenode.
|
||||
* This command can't be combined with other commands in the same response.
|
||||
* This is because after the datanode processes RegisterCommand, it will skip
|
||||
* the rest of the DatanodeCommands in the same HeartbeatResponse.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Evolving
|
||||
|
|
|
@ -94,6 +94,8 @@ public class TestFSNamesystemMBean {
|
|||
"SnapshotStats"));
|
||||
Long MaxObjects = (Long) (mbs.getAttribute(mxbeanNameFsns,
|
||||
"MaxObjects"));
|
||||
Integer numStaleStorages = (Integer) (mbs.getAttribute(
|
||||
mxbeanNameFsns, "NumStaleStorages"));
|
||||
|
||||
// Metrics that belong to "NameNodeInfo".
|
||||
// These are metrics that FSNamesystem registers directly with MBeanServer.
|
||||
|
|
|
@ -26,6 +26,7 @@ import static org.junit.Assert.fail;
|
|||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.lang.management.ManagementFactory;
|
||||
import java.net.InetAddress;
|
||||
import java.net.URI;
|
||||
import java.util.ArrayList;
|
||||
|
@ -49,6 +50,7 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
|
|||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
|
||||
import org.apache.hadoop.hdfs.server.common.Storage;
|
||||
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
|
||||
|
@ -64,6 +66,9 @@ import org.junit.After;
|
|||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import javax.management.MBeanServer;
|
||||
import javax.management.ObjectName;
|
||||
|
||||
/**
|
||||
* Startup and checkpoint tests
|
||||
*
|
||||
|
@ -684,4 +689,40 @@ public class TestStartup {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Verify the following scenario.
|
||||
* 1. NN restarts.
|
||||
* 2. Heartbeat RPC will retry and succeed. NN asks DN to reregister.
|
||||
* 3. After reregistration completes, DN will send Heartbeat, followed by
|
||||
* Blockreport.
|
||||
* 4. NN will mark DatanodeStorageInfo#blockContentsStale to false.
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test(timeout = 60000)
|
||||
public void testStorageBlockContentsStaleAfterNNRestart() throws Exception {
|
||||
MiniDFSCluster dfsCluster = null;
|
||||
try {
|
||||
Configuration config = new Configuration();
|
||||
dfsCluster = new MiniDFSCluster.Builder(config).numDataNodes(1).build();
|
||||
dfsCluster.waitActive();
|
||||
dfsCluster.restartNameNode(true);
|
||||
BlockManagerTestUtil.checkHeartbeat(
|
||||
dfsCluster.getNamesystem().getBlockManager());
|
||||
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
|
||||
ObjectName mxbeanNameFsns = new ObjectName(
|
||||
"Hadoop:service=NameNode,name=FSNamesystemState");
|
||||
Integer numStaleStorages = (Integer) (mbs.getAttribute(
|
||||
mxbeanNameFsns, "NumStaleStorages"));
|
||||
assertEquals(0, numStaleStorages.intValue());
|
||||
} finally {
|
||||
if (dfsCluster != null) {
|
||||
dfsCluster.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue