diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 7ca9560a114..21ae6260bc6 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -49,6 +49,9 @@ Release 2.0.5-beta - UNRELEASED
YARN-376. Fixes a bug which would prevent the NM knowing about completed
containers and applications. (Jason Lowe via sseth)
+ YARN-196. Nodemanager should be more robust in handling connection failure
+ to ResourceManager when a cluster is started (Xuan Gong via hitesh)
+
Release 2.0.4-alpha - UNRELEASED
INCOMPATIBLE CHANGES
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index fcd22cbc8b7..6a900790a1e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -621,6 +621,20 @@ public class YarnConfiguration extends Configuration {
public static final long DEFAULT_NM_PROCESS_KILL_WAIT_MS =
2000;
+ /** Max time to wait to establish a connection to RM when NM starts
+ */
+ public static final String RESOURCEMANAGER_CONNECT_WAIT_SECS =
+ NM_PREFIX + "resourcemanager.connect.wait.secs";
+ public static final int DEFAULT_RESOURCEMANAGER_CONNECT_WAIT_SECS =
+ 15*60;
+
+ /** Time interval between each NM attempt to connect to RM
+ */
+ public static final String RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS =
+ NM_PREFIX + "resourcemanager.connect.retry_interval.secs";
+ public static final long DEFAULT_RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS
+ = 30;
+
/**
* CLASSPATH for YARN applications. A comma-separated list of CLASSPATH
* entries
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 588bb1bccfc..08936a3507a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -597,6 +597,20 @@
2000
+
+ Max time, in seconds, to wait to establish a connection to RM when NM starts.
+ The NM will shutdown if it cannot connect to RM within the specified max time period.
+ If the value is set as -1, then NM will retry forever.
+ yarn.nodemanager.resourcemanager.connect.wait.secs
+ 900
+
+
+
+ Time interval, in seconds, between each NM attempt to connect to RM.
+ yarn.nodemanager.resourcemanager.connect.retry_interval.secs
+ 30
+
+
yarn.nodemanager.aux-services.mapreduce.shuffle.class
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
index 841e16997da..e3dae41603a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
@@ -151,7 +151,6 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
YarnConfiguration.DEFAULT_NM_WEBAPP_ADDRESS,
YarnConfiguration.DEFAULT_NM_WEBAPP_PORT);
try {
- // this.hostName = InetAddress.getLocalHost().getCanonicalHostName();
this.httpPort = httpBindAddress.getPort();
// Registration has to be in start so that ContainerManager can get the
// perNM tokens needed to authenticate ContainerTokens.
@@ -189,15 +188,84 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
}
private void registerWithRM() throws YarnRemoteException {
- this.resourceTracker = getRMClient();
- LOG.info("Connecting to ResourceManager at " + this.rmAddress);
-
- RegisterNodeManagerRequest request = recordFactory.newRecordInstance(RegisterNodeManagerRequest.class);
+ Configuration conf = getConfig();
+ long rmConnectWaitMS =
+ conf.getInt(
+ YarnConfiguration.RESOURCEMANAGER_CONNECT_WAIT_SECS,
+ YarnConfiguration.DEFAULT_RESOURCEMANAGER_CONNECT_WAIT_SECS)
+ * 1000;
+ long rmConnectionRetryIntervalMS =
+ conf.getLong(
+ YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS,
+ YarnConfiguration
+ .DEFAULT_RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS)
+ * 1000;
+
+ if(rmConnectionRetryIntervalMS < 0) {
+ throw new YarnException("Invalid Configuration. " +
+ YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS +
+ " should not be negative.");
+ }
+
+ boolean waitForEver = (rmConnectWaitMS == -1000);
+
+ if(! waitForEver) {
+ if(rmConnectWaitMS < 0) {
+ throw new YarnException("Invalid Configuration. " +
+ YarnConfiguration.RESOURCEMANAGER_CONNECT_WAIT_SECS +
+ " can be -1, but can not be other negative numbers");
+ }
+
+ //try connect once
+ if(rmConnectWaitMS < rmConnectionRetryIntervalMS) {
+ LOG.warn(YarnConfiguration.RESOURCEMANAGER_CONNECT_WAIT_SECS
+ + " is smaller than "
+ + YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS
+ + ". Only try connect once.");
+ rmConnectWaitMS = 0;
+ }
+ }
+
+ int rmRetryCount = 0;
+ long waitStartTime = System.currentTimeMillis();
+
+ RegisterNodeManagerRequest request =
+ recordFactory.newRecordInstance(RegisterNodeManagerRequest.class);
request.setHttpPort(this.httpPort);
request.setResource(this.totalResource);
request.setNodeId(this.nodeId);
- RegistrationResponse regResponse =
- this.resourceTracker.registerNodeManager(request).getRegistrationResponse();
+ RegistrationResponse regResponse;
+
+ while(true) {
+ try {
+ rmRetryCount++;
+ LOG.info("Connecting to ResourceManager at " + this.rmAddress
+ + ". current no. of attempts is " + rmRetryCount);
+ this.resourceTracker = getRMClient();
+ regResponse =
+ this.resourceTracker.registerNodeManager(request)
+ .getRegistrationResponse();
+ break;
+ } catch(Throwable e) {
+ LOG.warn("Trying to connect to ResourceManager, " +
+ "current no. of failed attempts is "+rmRetryCount);
+ if(System.currentTimeMillis() - waitStartTime < rmConnectWaitMS
+ || waitForEver) {
+ try {
+ LOG.info("Sleeping for " + rmConnectionRetryIntervalMS/1000
+ + " seconds before next connection retry to RM");
+ Thread.sleep(rmConnectionRetryIntervalMS);
+ } catch(InterruptedException ex) {
+ //done nothing
+ }
+ } else {
+ String errorMessage = "Failed to Connect to RM, " +
+ "no. of failed attempts is "+rmRetryCount;
+ LOG.error(errorMessage,e);
+ throw new YarnException(errorMessage,e);
+ }
+ }
+ }
// if the Resourcemanager instructs NM to shutdown.
if (NodeAction.SHUTDOWN.equals(regResponse.getNodeAction())) {
throw new YarnException(
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
index d65b096bbe8..ff9e0824efc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
@@ -267,6 +267,36 @@ public class TestNodeStatusUpdater {
}
}
+ private class MyNodeStatusUpdater4 extends NodeStatusUpdaterImpl {
+ public ResourceTracker resourceTracker =
+ new MyResourceTracker(this.context);
+ private Context context;
+ private final long waitStartTime;
+ private final long rmStartIntervalMS;
+ private final boolean rmNeverStart;
+
+ public MyNodeStatusUpdater4(Context context, Dispatcher dispatcher,
+ NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics,
+ long rmStartIntervalMS, boolean rmNeverStart) {
+ super(context, dispatcher, healthChecker, metrics);
+ this.context = context;
+ this.waitStartTime = System.currentTimeMillis();
+ this.rmStartIntervalMS = rmStartIntervalMS;
+ this.rmNeverStart = rmNeverStart;
+ }
+
+ @Override
+ protected ResourceTracker getRMClient() {
+ if(System.currentTimeMillis() - waitStartTime <= rmStartIntervalMS
+ || rmNeverStart) {
+ throw new YarnException("Faking RM start failure as start " +
+ "delay timer has not expired.");
+ } else {
+ return resourceTracker;
+ }
+ }
+ }
+
private class MyNodeManager extends NodeManager {
private MyNodeStatusUpdater3 nodeStatusUpdater;
@@ -580,6 +610,73 @@ public class TestNodeStatusUpdater {
+ "Recieved SHUTDOWN signal from Resourcemanager ,Registration of NodeManager failed");
}
+ @Test (timeout = 15000)
+ public void testNMConnectionToRM() {
+ final long delta = 1500;
+ final long connectionWaitSecs = 5;
+ final long connectionRetryIntervalSecs = 1;
+ //Waiting for rmStartIntervalMS, RM will be started
+ final long rmStartIntervalMS = 2*1000;
+ YarnConfiguration conf = createNMConfig();
+ conf.setLong(YarnConfiguration.RESOURCEMANAGER_CONNECT_WAIT_SECS,
+ connectionWaitSecs);
+ conf.setLong(YarnConfiguration
+ .RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS,
+ connectionRetryIntervalSecs);
+
+ //Test NM try to connect to RM Several times, but finally fail
+ nm = new NodeManager() {
+ @Override
+ protected NodeStatusUpdater createNodeStatusUpdater(Context context,
+ Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
+ NodeStatusUpdater nodeStatusUpdater = new MyNodeStatusUpdater4(
+ context, dispatcher, healthChecker, metrics,
+ rmStartIntervalMS, true);
+ return nodeStatusUpdater;
+ }
+ };
+ nm.init(conf);
+ long waitStartTime = System.currentTimeMillis();
+ try {
+ nm.start();
+ Assert.fail("NM should have failed to start due to RM connect failure");
+ } catch(Exception e) {
+ Assert.assertTrue("NM should have tried re-connecting to RM during " +
+ "period of at least " + connectionWaitSecs + " seconds, but " +
+ "stopped retrying within " + (connectionWaitSecs + delta/1000) +
+ " seconds", (System.currentTimeMillis() - waitStartTime
+ >= connectionWaitSecs*1000) && (System.currentTimeMillis()
+ - waitStartTime < (connectionWaitSecs*1000+delta)));
+ }
+
+ //Test NM connect to RM, fail at first several attempts,
+ //but finally success.
+ nm = new NodeManager() {
+ @Override
+ protected NodeStatusUpdater createNodeStatusUpdater(Context context,
+ Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
+ NodeStatusUpdater nodeStatusUpdater = new MyNodeStatusUpdater4(
+ context, dispatcher, healthChecker, metrics, rmStartIntervalMS,
+ false);
+ return nodeStatusUpdater;
+ }
+ };
+
+ nm.init(conf);
+ waitStartTime = System.currentTimeMillis();
+ try {
+ nm.start();
+ } catch (Exception ex){
+ Assert.fail("NM should have started successfully " +
+ "after connecting to RM.");
+ }
+ Assert.assertTrue("NM should have connected to RM within " + delta/1000
+ +" seconds of RM starting up.",
+ (System.currentTimeMillis() - waitStartTime >= rmStartIntervalMS)
+ && (System.currentTimeMillis() - waitStartTime
+ < (rmStartIntervalMS+delta)));
+ }
+
/**
* Verifies that if for some reason NM fails to start ContainerManager RPC
* server, RM is oblivious to NM's presence. The behaviour is like this