HDFS-6772: Merging r1616680 from trunk to branch-2.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1616681 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Arpit Agarwal 2014-08-08 05:42:00 +00:00
parent 481ac00c65
commit 8d73cbb8d8
10 changed files with 108 additions and 4 deletions

View File

@ -121,6 +121,9 @@ Release 2.6.0 - UNRELEASED
HDFS-6722. Display readable last contact time for dead nodes on NN webUI. HDFS-6722. Display readable last contact time for dead nodes on NN webUI.
(Ming Ma via wheat9) (Ming Ma via wheat9)
HDFS-6772. Get DN storages out of blockContentsStale state faster after
NN restarts. (Ming Ma via Arpit Agarwal)
OPTIMIZATIONS OPTIMIZATIONS
HDFS-6690. Deduplicate xattr names in memory. (wang) HDFS-6690. Deduplicate xattr names in memory. (wang)

View File

@ -136,6 +136,9 @@ public class DatanodeManager {
/** The number of stale DataNodes */ /** The number of stale DataNodes */
private volatile int numStaleNodes; private volatile int numStaleNodes;
/** The number of stale storages */
private volatile int numStaleStorages;
/** /**
* Whether or not this cluster has ever consisted of more than 1 rack, * Whether or not this cluster has ever consisted of more than 1 rack,
* according to the NetworkTopology. * according to the NetworkTopology.
@ -1141,6 +1144,22 @@ public class DatanodeManager {
return this.numStaleNodes; return this.numStaleNodes;
} }
/**
* Get the number of content stale storages.
*/
public int getNumStaleStorages() {
return numStaleStorages;
}
/**
* Set the number of content stale storages.
*
* @param numStaleStorages The number of content stale storages.
*/
void setNumStaleStorages(int numStaleStorages) {
this.numStaleStorages = numStaleStorages;
}
/** Fetch live and dead datanodes. */ /** Fetch live and dead datanodes. */
public void fetchDatanodes(final List<DatanodeDescriptor> live, public void fetchDatanodes(final List<DatanodeDescriptor> live,
final List<DatanodeDescriptor> dead, final boolean removeDecommissionNode) { final List<DatanodeDescriptor> dead, final boolean removeDecommissionNode) {

View File

@ -256,6 +256,7 @@ class HeartbeatManager implements DatanodeStatistics {
DatanodeID dead = null; DatanodeID dead = null;
// check the number of stale nodes // check the number of stale nodes
int numOfStaleNodes = 0; int numOfStaleNodes = 0;
int numOfStaleStorages = 0;
synchronized(this) { synchronized(this) {
for (DatanodeDescriptor d : datanodes) { for (DatanodeDescriptor d : datanodes) {
if (dead == null && dm.isDatanodeDead(d)) { if (dead == null && dm.isDatanodeDead(d)) {
@ -265,10 +266,17 @@ class HeartbeatManager implements DatanodeStatistics {
if (d.isStale(dm.getStaleInterval())) { if (d.isStale(dm.getStaleInterval())) {
numOfStaleNodes++; numOfStaleNodes++;
} }
DatanodeStorageInfo[] storageInfos = d.getStorageInfos();
for(DatanodeStorageInfo storageInfo : storageInfos) {
if (storageInfo.areBlockContentsStale()) {
numOfStaleStorages++;
}
}
} }
// Set the number of stale nodes in the DatanodeManager // Set the number of stale nodes in the DatanodeManager
dm.setNumStaleNodes(numOfStaleNodes); dm.setNumStaleNodes(numOfStaleNodes);
dm.setNumStaleStorages(numOfStaleStorages);
} }
allAlive = dead == null; allAlive = dead == null;

View File

@ -602,7 +602,7 @@ class BPOfferService {
LOG.info("DatanodeCommand action : DNA_REGISTER from " + actor.nnAddr LOG.info("DatanodeCommand action : DNA_REGISTER from " + actor.nnAddr
+ " with " + actor.state + " state"); + " with " + actor.state + " state");
actor.reRegister(); actor.reRegister();
return true; return false;
} }
writeLock(); writeLock();
try { try {

View File

@ -223,6 +223,18 @@ class BPServiceActor implements Runnable {
register(); register();
} }
// This is useful to make sure NN gets Heartbeat before Blockreport
// upon NN restart while DN keeps retrying Otherwise,
// 1. NN restarts.
// 2. Heartbeat RPC will retry and succeed. NN asks DN to reregister.
// 3. After reregistration completes, DN will send Blockreport first.
// 4. Given NN receives Blockreport after Heartbeat, it won't mark
// DatanodeStorageInfo#blockContentsStale to false until the next
// Blockreport.
void scheduleHeartbeat() {
lastHeartbeat = 0;
}
/** /**
* This methods arranges for the data node to send the block report at * This methods arranges for the data node to send the block report at
* the next heartbeat. * the next heartbeat.
@ -902,6 +914,7 @@ class BPServiceActor implements Runnable {
retrieveNamespaceInfo(); retrieveNamespaceInfo();
// and re-register // and re-register
register(); register();
scheduleHeartbeat();
} }
} }

View File

@ -6092,7 +6092,6 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
} }
} }
@Override // FSNamesystemMBean @Override // FSNamesystemMBean
public int getNumLiveDataNodes() { public int getNumLiveDataNodes() {
return getBlockManager().getDatanodeManager().getNumLiveDataNodes(); return getBlockManager().getDatanodeManager().getNumLiveDataNodes();
@ -6138,6 +6137,15 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
return getBlockManager().getDatanodeManager().getNumStaleNodes(); return getBlockManager().getDatanodeManager().getNumStaleNodes();
} }
/**
* Storages are marked as "content stale" after NN restart or fails over and
* before NN receives the first Heartbeat followed by the first Blockreport.
*/
@Override // FSNamesystemMBean
public int getNumStaleStorages() {
return getBlockManager().getDatanodeManager().getNumStaleStorages();
}
/** /**
* Sets the current generation stamp for legacy blocks * Sets the current generation stamp for legacy blocks
*/ */

View File

@ -151,4 +151,11 @@ public interface FSNamesystemMBean {
* @return number of blocks pending deletion * @return number of blocks pending deletion
*/ */
long getPendingDeletionBlocks(); long getPendingDeletionBlocks();
/**
* Number of content stale storages.
* @return number of content stale storages
*/
public int getNumStaleStorages();
} }

View File

@ -22,6 +22,9 @@ import org.apache.hadoop.classification.InterfaceStability;
/** /**
* A BlockCommand is an instruction to a datanode to register with the namenode. * A BlockCommand is an instruction to a datanode to register with the namenode.
* This command can't be combined with other commands in the same response.
* This is because after the datanode processes RegisterCommand, it will skip
* the rest of the DatanodeCommands in the same HeartbeatResponse.
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
@InterfaceStability.Evolving @InterfaceStability.Evolving

View File

@ -94,6 +94,8 @@ public class TestFSNamesystemMBean {
"SnapshotStats")); "SnapshotStats"));
Long MaxObjects = (Long) (mbs.getAttribute(mxbeanNameFsns, Long MaxObjects = (Long) (mbs.getAttribute(mxbeanNameFsns,
"MaxObjects")); "MaxObjects"));
Integer numStaleStorages = (Integer) (mbs.getAttribute(
mxbeanNameFsns, "NumStaleStorages"));
// Metrics that belong to "NameNodeInfo". // Metrics that belong to "NameNodeInfo".
// These are metrics that FSNamesystem registers directly with MBeanServer. // These are metrics that FSNamesystem registers directly with MBeanServer.

View File

@ -26,6 +26,7 @@ import static org.junit.Assert.fail;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.net.InetAddress; import java.net.InetAddress;
import java.net.URI; import java.net.URI;
import java.util.ArrayList; import java.util.ArrayList;
@ -51,6 +52,7 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction; import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
import org.apache.hadoop.hdfs.server.common.Storage; import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory; import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType; import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
@ -66,6 +68,9 @@ import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import javax.management.MBeanServer;
import javax.management.ObjectName;
/** /**
* Startup and checkpoint tests * Startup and checkpoint tests
* *
@ -699,4 +704,40 @@ public class TestStartup {
} }
} }
} }
/**
* Verify the following scenario.
* 1. NN restarts.
* 2. Heartbeat RPC will retry and succeed. NN asks DN to reregister.
* 3. After reregistration completes, DN will send Heartbeat, followed by
* Blockreport.
* 4. NN will mark DatanodeStorageInfo#blockContentsStale to false.
* @throws Exception
*/
@Test(timeout = 60000)
public void testStorageBlockContentsStaleAfterNNRestart() throws Exception {
MiniDFSCluster dfsCluster = null;
try {
Configuration config = new Configuration();
dfsCluster = new MiniDFSCluster.Builder(config).numDataNodes(1).build();
dfsCluster.waitActive();
dfsCluster.restartNameNode(true);
BlockManagerTestUtil.checkHeartbeat(
dfsCluster.getNamesystem().getBlockManager());
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
ObjectName mxbeanNameFsns = new ObjectName(
"Hadoop:service=NameNode,name=FSNamesystemState");
Integer numStaleStorages = (Integer) (mbs.getAttribute(
mxbeanNameFsns, "NumStaleStorages"));
assertEquals(0, numStaleStorages.intValue());
} finally {
if (dfsCluster != null) {
dfsCluster.shutdown();
}
}
return;
}
} }