merge -c 1457038 from trunk to branch-2 to fix YARN-196. Nodemanager should be more robust in handling connection failure to ResourceManager when a cluster is started. Contributed by Xuan Gong.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1457041 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hitesh Shah 2013-03-15 18:07:04 +00:00
parent 5f0e779663
commit a21de508a1
5 changed files with 203 additions and 7 deletions

View File

@ -49,6 +49,9 @@ Release 2.0.5-beta - UNRELEASED
YARN-376. Fixes a bug which would prevent the NM knowing about completed YARN-376. Fixes a bug which would prevent the NM knowing about completed
containers and applications. (Jason Lowe via sseth) containers and applications. (Jason Lowe via sseth)
YARN-196. Nodemanager should be more robust in handling connection failure
to ResourceManager when a cluster is started (Xuan Gong via hitesh)
Release 2.0.4-alpha - UNRELEASED Release 2.0.4-alpha - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -621,6 +621,20 @@ public class YarnConfiguration extends Configuration {
public static final long DEFAULT_NM_PROCESS_KILL_WAIT_MS = public static final long DEFAULT_NM_PROCESS_KILL_WAIT_MS =
2000; 2000;
/** Max time to wait to establish a connection to RM when NM starts
*/
public static final String RESOURCEMANAGER_CONNECT_WAIT_SECS =
NM_PREFIX + "resourcemanager.connect.wait.secs";
public static final int DEFAULT_RESOURCEMANAGER_CONNECT_WAIT_SECS =
15*60;
/** Time interval between each NM attempt to connect to RM
*/
public static final String RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS =
NM_PREFIX + "resourcemanager.connect.retry_interval.secs";
public static final long DEFAULT_RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS
= 30;
/** /**
* CLASSPATH for YARN applications. A comma-separated list of CLASSPATH * CLASSPATH for YARN applications. A comma-separated list of CLASSPATH
* entries * entries

View File

@ -597,6 +597,20 @@
<value>2000</value> <value>2000</value>
</property> </property>
<property>
<description>Max time, in seconds, to wait to establish a connection to RM when NM starts.
The NM will shutdown if it cannot connect to RM within the specified max time period.
If the value is set as -1, then NM will retry forever.</description>
<name>yarn.nodemanager.resourcemanager.connect.wait.secs</name>
<value>900</value>
</property>
<property>
<description>Time interval, in seconds, between each NM attempt to connect to RM.</description>
<name>yarn.nodemanager.resourcemanager.connect.retry_interval.secs</name>
<value>30</value>
</property>
<!--Map Reduce configuration--> <!--Map Reduce configuration-->
<property> <property>
<name>yarn.nodemanager.aux-services.mapreduce.shuffle.class</name> <name>yarn.nodemanager.aux-services.mapreduce.shuffle.class</name>

View File

@ -151,7 +151,6 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
YarnConfiguration.DEFAULT_NM_WEBAPP_ADDRESS, YarnConfiguration.DEFAULT_NM_WEBAPP_ADDRESS,
YarnConfiguration.DEFAULT_NM_WEBAPP_PORT); YarnConfiguration.DEFAULT_NM_WEBAPP_PORT);
try { try {
// this.hostName = InetAddress.getLocalHost().getCanonicalHostName();
this.httpPort = httpBindAddress.getPort(); this.httpPort = httpBindAddress.getPort();
// Registration has to be in start so that ContainerManager can get the // Registration has to be in start so that ContainerManager can get the
// perNM tokens needed to authenticate ContainerTokens. // perNM tokens needed to authenticate ContainerTokens.
@ -189,15 +188,84 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
} }
private void registerWithRM() throws YarnRemoteException { private void registerWithRM() throws YarnRemoteException {
this.resourceTracker = getRMClient(); Configuration conf = getConfig();
LOG.info("Connecting to ResourceManager at " + this.rmAddress); long rmConnectWaitMS =
conf.getInt(
YarnConfiguration.RESOURCEMANAGER_CONNECT_WAIT_SECS,
YarnConfiguration.DEFAULT_RESOURCEMANAGER_CONNECT_WAIT_SECS)
* 1000;
long rmConnectionRetryIntervalMS =
conf.getLong(
YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS,
YarnConfiguration
.DEFAULT_RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS)
* 1000;
RegisterNodeManagerRequest request = recordFactory.newRecordInstance(RegisterNodeManagerRequest.class); if(rmConnectionRetryIntervalMS < 0) {
throw new YarnException("Invalid Configuration. " +
YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS +
" should not be negative.");
}
boolean waitForEver = (rmConnectWaitMS == -1000);
if(! waitForEver) {
if(rmConnectWaitMS < 0) {
throw new YarnException("Invalid Configuration. " +
YarnConfiguration.RESOURCEMANAGER_CONNECT_WAIT_SECS +
" can be -1, but can not be other negative numbers");
}
//try connect once
if(rmConnectWaitMS < rmConnectionRetryIntervalMS) {
LOG.warn(YarnConfiguration.RESOURCEMANAGER_CONNECT_WAIT_SECS
+ " is smaller than "
+ YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS
+ ". Only try connect once.");
rmConnectWaitMS = 0;
}
}
int rmRetryCount = 0;
long waitStartTime = System.currentTimeMillis();
RegisterNodeManagerRequest request =
recordFactory.newRecordInstance(RegisterNodeManagerRequest.class);
request.setHttpPort(this.httpPort); request.setHttpPort(this.httpPort);
request.setResource(this.totalResource); request.setResource(this.totalResource);
request.setNodeId(this.nodeId); request.setNodeId(this.nodeId);
RegistrationResponse regResponse = RegistrationResponse regResponse;
this.resourceTracker.registerNodeManager(request).getRegistrationResponse();
while(true) {
try {
rmRetryCount++;
LOG.info("Connecting to ResourceManager at " + this.rmAddress
+ ". current no. of attempts is " + rmRetryCount);
this.resourceTracker = getRMClient();
regResponse =
this.resourceTracker.registerNodeManager(request)
.getRegistrationResponse();
break;
} catch(Throwable e) {
LOG.warn("Trying to connect to ResourceManager, " +
"current no. of failed attempts is "+rmRetryCount);
if(System.currentTimeMillis() - waitStartTime < rmConnectWaitMS
|| waitForEver) {
try {
LOG.info("Sleeping for " + rmConnectionRetryIntervalMS/1000
+ " seconds before next connection retry to RM");
Thread.sleep(rmConnectionRetryIntervalMS);
} catch(InterruptedException ex) {
//done nothing
}
} else {
String errorMessage = "Failed to Connect to RM, " +
"no. of failed attempts is "+rmRetryCount;
LOG.error(errorMessage,e);
throw new YarnException(errorMessage,e);
}
}
}
// if the Resourcemanager instructs NM to shutdown. // if the Resourcemanager instructs NM to shutdown.
if (NodeAction.SHUTDOWN.equals(regResponse.getNodeAction())) { if (NodeAction.SHUTDOWN.equals(regResponse.getNodeAction())) {
throw new YarnException( throw new YarnException(

View File

@ -267,6 +267,36 @@ public class TestNodeStatusUpdater {
} }
} }
private class MyNodeStatusUpdater4 extends NodeStatusUpdaterImpl {
public ResourceTracker resourceTracker =
new MyResourceTracker(this.context);
private Context context;
private final long waitStartTime;
private final long rmStartIntervalMS;
private final boolean rmNeverStart;
public MyNodeStatusUpdater4(Context context, Dispatcher dispatcher,
NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics,
long rmStartIntervalMS, boolean rmNeverStart) {
super(context, dispatcher, healthChecker, metrics);
this.context = context;
this.waitStartTime = System.currentTimeMillis();
this.rmStartIntervalMS = rmStartIntervalMS;
this.rmNeverStart = rmNeverStart;
}
@Override
protected ResourceTracker getRMClient() {
if(System.currentTimeMillis() - waitStartTime <= rmStartIntervalMS
|| rmNeverStart) {
throw new YarnException("Faking RM start failure as start " +
"delay timer has not expired.");
} else {
return resourceTracker;
}
}
}
private class MyNodeManager extends NodeManager { private class MyNodeManager extends NodeManager {
private MyNodeStatusUpdater3 nodeStatusUpdater; private MyNodeStatusUpdater3 nodeStatusUpdater;
@ -580,6 +610,73 @@ public class TestNodeStatusUpdater {
+ "Recieved SHUTDOWN signal from Resourcemanager ,Registration of NodeManager failed"); + "Recieved SHUTDOWN signal from Resourcemanager ,Registration of NodeManager failed");
} }
@Test (timeout = 15000)
public void testNMConnectionToRM() {
final long delta = 1500;
final long connectionWaitSecs = 5;
final long connectionRetryIntervalSecs = 1;
//Waiting for rmStartIntervalMS, RM will be started
final long rmStartIntervalMS = 2*1000;
YarnConfiguration conf = createNMConfig();
conf.setLong(YarnConfiguration.RESOURCEMANAGER_CONNECT_WAIT_SECS,
connectionWaitSecs);
conf.setLong(YarnConfiguration
.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS,
connectionRetryIntervalSecs);
//Test NM try to connect to RM Several times, but finally fail
nm = new NodeManager() {
@Override
protected NodeStatusUpdater createNodeStatusUpdater(Context context,
Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
NodeStatusUpdater nodeStatusUpdater = new MyNodeStatusUpdater4(
context, dispatcher, healthChecker, metrics,
rmStartIntervalMS, true);
return nodeStatusUpdater;
}
};
nm.init(conf);
long waitStartTime = System.currentTimeMillis();
try {
nm.start();
Assert.fail("NM should have failed to start due to RM connect failure");
} catch(Exception e) {
Assert.assertTrue("NM should have tried re-connecting to RM during " +
"period of at least " + connectionWaitSecs + " seconds, but " +
"stopped retrying within " + (connectionWaitSecs + delta/1000) +
" seconds", (System.currentTimeMillis() - waitStartTime
>= connectionWaitSecs*1000) && (System.currentTimeMillis()
- waitStartTime < (connectionWaitSecs*1000+delta)));
}
//Test NM connect to RM, fail at first several attempts,
//but finally success.
nm = new NodeManager() {
@Override
protected NodeStatusUpdater createNodeStatusUpdater(Context context,
Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
NodeStatusUpdater nodeStatusUpdater = new MyNodeStatusUpdater4(
context, dispatcher, healthChecker, metrics, rmStartIntervalMS,
false);
return nodeStatusUpdater;
}
};
nm.init(conf);
waitStartTime = System.currentTimeMillis();
try {
nm.start();
} catch (Exception ex){
Assert.fail("NM should have started successfully " +
"after connecting to RM.");
}
Assert.assertTrue("NM should have connected to RM within " + delta/1000
+" seconds of RM starting up.",
(System.currentTimeMillis() - waitStartTime >= rmStartIntervalMS)
&& (System.currentTimeMillis() - waitStartTime
< (rmStartIntervalMS+delta)));
}
/** /**
* Verifies that if for some reason NM fails to start ContainerManager RPC * Verifies that if for some reason NM fails to start ContainerManager RPC
* server, RM is oblivious to NM's presence. The behaviour is like this * server, RM is oblivious to NM's presence. The behaviour is like this