From a21de508a147f6f920e2faad94173c8ddc2d9f0a Mon Sep 17 00:00:00 2001 From: Hitesh Shah Date: Fri, 15 Mar 2013 18:07:04 +0000 Subject: [PATCH] merge -c 1457038 from trunk to branch-2 to fix YARN-196. Nodemanager should be more robust in handling connection failure to ResourceManager when a cluster is started. Contributed by Xuan Gong. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1457041 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-yarn-project/CHANGES.txt | 3 + .../hadoop/yarn/conf/YarnConfiguration.java | 14 +++ .../src/main/resources/yarn-default.xml | 14 +++ .../nodemanager/NodeStatusUpdaterImpl.java | 82 ++++++++++++++-- .../nodemanager/TestNodeStatusUpdater.java | 97 +++++++++++++++++++ 5 files changed, 203 insertions(+), 7 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 7ca9560a114..21ae6260bc6 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -49,6 +49,9 @@ Release 2.0.5-beta - UNRELEASED YARN-376. Fixes a bug which would prevent the NM knowing about completed containers and applications. (Jason Lowe via sseth) + YARN-196. Nodemanager should be more robust in handling connection failure + to ResourceManager when a cluster is started (Xuan Gong via hitesh) + Release 2.0.4-alpha - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index fcd22cbc8b7..6a900790a1e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -621,6 +621,20 @@ public class YarnConfiguration extends Configuration { public static final long DEFAULT_NM_PROCESS_KILL_WAIT_MS = 2000; + /** Max time to wait to establish a connection to RM when NM starts + */ + public static final String RESOURCEMANAGER_CONNECT_WAIT_SECS = + NM_PREFIX + "resourcemanager.connect.wait.secs"; + public static final int DEFAULT_RESOURCEMANAGER_CONNECT_WAIT_SECS = + 15*60; + + /** Time interval between each NM attempt to connect to RM + */ + public static final String RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS = + NM_PREFIX + "resourcemanager.connect.retry_interval.secs"; + public static final long DEFAULT_RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS + = 30; + /** * CLASSPATH for YARN applications. A comma-separated list of CLASSPATH * entries diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 588bb1bccfc..08936a3507a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -597,6 +597,20 @@ 2000 + + Max time, in seconds, to wait to establish a connection to RM when NM starts. + The NM will shutdown if it cannot connect to RM within the specified max time period. + If the value is set as -1, then NM will retry forever. + yarn.nodemanager.resourcemanager.connect.wait.secs + 900 + + + + Time interval, in seconds, between each NM attempt to connect to RM. + yarn.nodemanager.resourcemanager.connect.retry_interval.secs + 30 + + yarn.nodemanager.aux-services.mapreduce.shuffle.class diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index 841e16997da..e3dae41603a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -151,7 +151,6 @@ public class NodeStatusUpdaterImpl extends AbstractService implements YarnConfiguration.DEFAULT_NM_WEBAPP_ADDRESS, YarnConfiguration.DEFAULT_NM_WEBAPP_PORT); try { - // this.hostName = InetAddress.getLocalHost().getCanonicalHostName(); this.httpPort = httpBindAddress.getPort(); // Registration has to be in start so that ContainerManager can get the // perNM tokens needed to authenticate ContainerTokens. @@ -189,15 +188,84 @@ public class NodeStatusUpdaterImpl extends AbstractService implements } private void registerWithRM() throws YarnRemoteException { - this.resourceTracker = getRMClient(); - LOG.info("Connecting to ResourceManager at " + this.rmAddress); - - RegisterNodeManagerRequest request = recordFactory.newRecordInstance(RegisterNodeManagerRequest.class); + Configuration conf = getConfig(); + long rmConnectWaitMS = + conf.getInt( + YarnConfiguration.RESOURCEMANAGER_CONNECT_WAIT_SECS, + YarnConfiguration.DEFAULT_RESOURCEMANAGER_CONNECT_WAIT_SECS) + * 1000; + long rmConnectionRetryIntervalMS = + conf.getLong( + YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS, + YarnConfiguration + .DEFAULT_RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS) + * 1000; + + if(rmConnectionRetryIntervalMS < 0) { + throw new YarnException("Invalid Configuration. " + + YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS + + " should not be negative."); + } + + boolean waitForEver = (rmConnectWaitMS == -1000); + + if(! waitForEver) { + if(rmConnectWaitMS < 0) { + throw new YarnException("Invalid Configuration. " + + YarnConfiguration.RESOURCEMANAGER_CONNECT_WAIT_SECS + + " can be -1, but can not be other negative numbers"); + } + + //try connect once + if(rmConnectWaitMS < rmConnectionRetryIntervalMS) { + LOG.warn(YarnConfiguration.RESOURCEMANAGER_CONNECT_WAIT_SECS + + " is smaller than " + + YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS + + ". Only try connect once."); + rmConnectWaitMS = 0; + } + } + + int rmRetryCount = 0; + long waitStartTime = System.currentTimeMillis(); + + RegisterNodeManagerRequest request = + recordFactory.newRecordInstance(RegisterNodeManagerRequest.class); request.setHttpPort(this.httpPort); request.setResource(this.totalResource); request.setNodeId(this.nodeId); - RegistrationResponse regResponse = - this.resourceTracker.registerNodeManager(request).getRegistrationResponse(); + RegistrationResponse regResponse; + + while(true) { + try { + rmRetryCount++; + LOG.info("Connecting to ResourceManager at " + this.rmAddress + + ". current no. of attempts is " + rmRetryCount); + this.resourceTracker = getRMClient(); + regResponse = + this.resourceTracker.registerNodeManager(request) + .getRegistrationResponse(); + break; + } catch(Throwable e) { + LOG.warn("Trying to connect to ResourceManager, " + + "current no. of failed attempts is "+rmRetryCount); + if(System.currentTimeMillis() - waitStartTime < rmConnectWaitMS + || waitForEver) { + try { + LOG.info("Sleeping for " + rmConnectionRetryIntervalMS/1000 + + " seconds before next connection retry to RM"); + Thread.sleep(rmConnectionRetryIntervalMS); + } catch(InterruptedException ex) { + //done nothing + } + } else { + String errorMessage = "Failed to Connect to RM, " + + "no. of failed attempts is "+rmRetryCount; + LOG.error(errorMessage,e); + throw new YarnException(errorMessage,e); + } + } + } // if the Resourcemanager instructs NM to shutdown. if (NodeAction.SHUTDOWN.equals(regResponse.getNodeAction())) { throw new YarnException( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java index d65b096bbe8..ff9e0824efc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java @@ -267,6 +267,36 @@ public class TestNodeStatusUpdater { } } + private class MyNodeStatusUpdater4 extends NodeStatusUpdaterImpl { + public ResourceTracker resourceTracker = + new MyResourceTracker(this.context); + private Context context; + private final long waitStartTime; + private final long rmStartIntervalMS; + private final boolean rmNeverStart; + + public MyNodeStatusUpdater4(Context context, Dispatcher dispatcher, + NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics, + long rmStartIntervalMS, boolean rmNeverStart) { + super(context, dispatcher, healthChecker, metrics); + this.context = context; + this.waitStartTime = System.currentTimeMillis(); + this.rmStartIntervalMS = rmStartIntervalMS; + this.rmNeverStart = rmNeverStart; + } + + @Override + protected ResourceTracker getRMClient() { + if(System.currentTimeMillis() - waitStartTime <= rmStartIntervalMS + || rmNeverStart) { + throw new YarnException("Faking RM start failure as start " + + "delay timer has not expired."); + } else { + return resourceTracker; + } + } + } + private class MyNodeManager extends NodeManager { private MyNodeStatusUpdater3 nodeStatusUpdater; @@ -580,6 +610,73 @@ public class TestNodeStatusUpdater { + "Recieved SHUTDOWN signal from Resourcemanager ,Registration of NodeManager failed"); } + @Test (timeout = 15000) + public void testNMConnectionToRM() { + final long delta = 1500; + final long connectionWaitSecs = 5; + final long connectionRetryIntervalSecs = 1; + //Waiting for rmStartIntervalMS, RM will be started + final long rmStartIntervalMS = 2*1000; + YarnConfiguration conf = createNMConfig(); + conf.setLong(YarnConfiguration.RESOURCEMANAGER_CONNECT_WAIT_SECS, + connectionWaitSecs); + conf.setLong(YarnConfiguration + .RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS, + connectionRetryIntervalSecs); + + //Test NM try to connect to RM Several times, but finally fail + nm = new NodeManager() { + @Override + protected NodeStatusUpdater createNodeStatusUpdater(Context context, + Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { + NodeStatusUpdater nodeStatusUpdater = new MyNodeStatusUpdater4( + context, dispatcher, healthChecker, metrics, + rmStartIntervalMS, true); + return nodeStatusUpdater; + } + }; + nm.init(conf); + long waitStartTime = System.currentTimeMillis(); + try { + nm.start(); + Assert.fail("NM should have failed to start due to RM connect failure"); + } catch(Exception e) { + Assert.assertTrue("NM should have tried re-connecting to RM during " + + "period of at least " + connectionWaitSecs + " seconds, but " + + "stopped retrying within " + (connectionWaitSecs + delta/1000) + + " seconds", (System.currentTimeMillis() - waitStartTime + >= connectionWaitSecs*1000) && (System.currentTimeMillis() + - waitStartTime < (connectionWaitSecs*1000+delta))); + } + + //Test NM connect to RM, fail at first several attempts, + //but finally success. + nm = new NodeManager() { + @Override + protected NodeStatusUpdater createNodeStatusUpdater(Context context, + Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { + NodeStatusUpdater nodeStatusUpdater = new MyNodeStatusUpdater4( + context, dispatcher, healthChecker, metrics, rmStartIntervalMS, + false); + return nodeStatusUpdater; + } + }; + + nm.init(conf); + waitStartTime = System.currentTimeMillis(); + try { + nm.start(); + } catch (Exception ex){ + Assert.fail("NM should have started successfully " + + "after connecting to RM."); + } + Assert.assertTrue("NM should have connected to RM within " + delta/1000 + +" seconds of RM starting up.", + (System.currentTimeMillis() - waitStartTime >= rmStartIntervalMS) + && (System.currentTimeMillis() - waitStartTime + < (rmStartIntervalMS+delta))); + } + /** * Verifies that if for some reason NM fails to start ContainerManager RPC * server, RM is oblivious to NM's presence. The behaviour is like this