YARN-4132. Separate configs for nodemanager to resourcemanager connection timeout and retries. Contributed by Chang Li
(cherry picked from commit 4ac6799d4a
)
This commit is contained in:
parent
2f18218508
commit
9e54433c6c
|
@ -519,6 +519,9 @@ Release 2.8.0 - UNRELEASED
|
|||
YARN-3980. Plumb resource-utilization info in node heartbeat through to the
|
||||
scheduler. (Inigo Goiri via kasha)
|
||||
|
||||
YARN-4132. Separate configs for nodemanager to resourcemanager connection
|
||||
timeout and retries (Chang Li via jlowe)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
YARN-3339. TestDockerContainerExecutor should pull a single image and not
|
||||
|
|
|
@ -2051,6 +2051,22 @@ public class YarnConfiguration extends Configuration {
|
|||
public static final String YARN_HTTP_POLICY_DEFAULT = HttpConfig.Policy.HTTP_ONLY
|
||||
.name();
|
||||
|
||||
/**
|
||||
* Max time to wait for NM to connection to RM.
|
||||
* When not set, proxy will fall back to use value of
|
||||
* RESOURCEMANAGER_CONNECT_MAX_WAIT_MS.
|
||||
*/
|
||||
public static final String NM_RESOURCEMANAGER_CONNECT_MAX_WAIT_MS =
|
||||
YARN_PREFIX + "nodemanager.resourcemanager.connect.max-wait.ms";
|
||||
|
||||
/**
|
||||
* Time interval between each NM attempt to connection to RM.
|
||||
* When not set, proxy will fall back to use value of
|
||||
* RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS.
|
||||
*/
|
||||
public static final String NM_RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS =
|
||||
YARN_PREFIX + "nodemanager.resourcemanager.connect.retry-interval.ms";
|
||||
|
||||
/**
|
||||
* Node-labels configurations
|
||||
*/
|
||||
|
|
|
@ -88,7 +88,32 @@ public class RMProxy<T> {
|
|||
YarnConfiguration conf = (configuration instanceof YarnConfiguration)
|
||||
? (YarnConfiguration) configuration
|
||||
: new YarnConfiguration(configuration);
|
||||
RetryPolicy retryPolicy = createRetryPolicy(conf);
|
||||
RetryPolicy retryPolicy =
|
||||
createRetryPolicy(conf);
|
||||
return createRMProxy(conf, protocol, instance, retryPolicy);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a proxy for the specified protocol. For non-HA,
|
||||
* this is a direct connection to the ResourceManager address. When HA is
|
||||
* enabled, the proxy handles the failover between the ResourceManagers as
|
||||
* well.
|
||||
*/
|
||||
@Private
|
||||
protected static <T> T createRMProxy(final Configuration configuration,
|
||||
final Class<T> protocol, RMProxy instance, final long retryTime,
|
||||
final long retryInterval) throws IOException {
|
||||
YarnConfiguration conf = (configuration instanceof YarnConfiguration)
|
||||
? (YarnConfiguration) configuration
|
||||
: new YarnConfiguration(configuration);
|
||||
RetryPolicy retryPolicy =
|
||||
createRetryPolicy(conf, retryTime, retryInterval);
|
||||
return createRMProxy(conf, protocol, instance, retryPolicy);
|
||||
}
|
||||
|
||||
private static <T> T createRMProxy(final YarnConfiguration conf,
|
||||
final Class<T> protocol, RMProxy instance, RetryPolicy retryPolicy)
|
||||
throws IOException{
|
||||
if (HAUtil.isHAEnabled(conf)) {
|
||||
RMFailoverProxyProvider<T> provider =
|
||||
instance.createRMFailoverProxyProvider(conf, protocol);
|
||||
|
@ -179,6 +204,18 @@ public class RMProxy<T> {
|
|||
YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS,
|
||||
YarnConfiguration
|
||||
.DEFAULT_RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS);
|
||||
return createRetryPolicy(
|
||||
conf, rmConnectWaitMS, rmConnectionRetryIntervalMS);
|
||||
}
|
||||
|
||||
/**
|
||||
* Fetch retry policy from Configuration and create the
|
||||
* retry policy with specified retryTime and retry interval.
|
||||
*/
|
||||
private static RetryPolicy createRetryPolicy(Configuration conf,
|
||||
long retryTime, long retryInterval) {
|
||||
long rmConnectWaitMS = retryTime;
|
||||
long rmConnectionRetryIntervalMS = retryInterval;
|
||||
|
||||
boolean waitForEver = (rmConnectWaitMS == -1);
|
||||
if (!waitForEver) {
|
||||
|
|
|
@ -1549,6 +1549,26 @@
|
|||
<value>10000</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<description>
|
||||
Max time to wait for NM to connect to RM.
|
||||
When not set, proxy will fall back to use value of
|
||||
yarn.resourcemanager.connect.max-wait.ms.
|
||||
</description>
|
||||
<name>yarn.nodemanager.resourcemanager.connect.max-wait.ms</name>
|
||||
<value></value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<description>
|
||||
Time interval between each NM attempt to connect to RM.
|
||||
When not set, proxy will fall back to use value of
|
||||
yarn.resourcemanager.connect.retry-interval.ms.
|
||||
</description>
|
||||
<name>yarn.nodemanager.resourcemanager.connect.retry-interval.ms</name>
|
||||
<value></value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<description>
|
||||
Maximum number of proxy connections to cache for node managers. If set
|
||||
|
|
|
@ -48,7 +48,25 @@ public class ServerRMProxy<T> extends RMProxy<T> {
|
|||
*/
|
||||
public static <T> T createRMProxy(final Configuration configuration,
|
||||
final Class<T> protocol) throws IOException {
|
||||
return createRMProxy(configuration, protocol, INSTANCE);
|
||||
long rmConnectWait =
|
||||
configuration.getLong(
|
||||
YarnConfiguration.RESOURCEMANAGER_CONNECT_MAX_WAIT_MS,
|
||||
YarnConfiguration.DEFAULT_RESOURCEMANAGER_CONNECT_MAX_WAIT_MS);
|
||||
long rmRetryInterval =
|
||||
configuration.getLong(
|
||||
YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS,
|
||||
YarnConfiguration
|
||||
.DEFAULT_RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS);
|
||||
long nmRmConnectWait =
|
||||
configuration.getLong(
|
||||
YarnConfiguration.NM_RESOURCEMANAGER_CONNECT_MAX_WAIT_MS,
|
||||
rmConnectWait);
|
||||
long nmRmRetryInterval =
|
||||
configuration.getLong(
|
||||
YarnConfiguration.NM_RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS,
|
||||
rmRetryInterval);
|
||||
return createRMProxy(configuration, protocol, INSTANCE,
|
||||
nmRmConnectWait, nmRmRetryInterval);
|
||||
}
|
||||
|
||||
@InterfaceAudience.Private
|
||||
|
|
|
@ -95,6 +95,7 @@ import org.apache.hadoop.yarn.server.api.records.MasterKey;
|
|||
import org.apache.hadoop.yarn.server.api.records.NodeAction;
|
||||
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
|
||||
import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
|
||||
|
@ -486,6 +487,35 @@ public class TestNodeStatusUpdater {
|
|||
}
|
||||
}
|
||||
|
||||
private class MyNodeStatusUpdater6 extends NodeStatusUpdaterImpl {
|
||||
|
||||
private final long rmStartIntervalMS;
|
||||
private final boolean rmNeverStart;
|
||||
public ResourceTracker resourceTracker;
|
||||
public MyNodeStatusUpdater6(Context context, Dispatcher dispatcher,
|
||||
NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics,
|
||||
long rmStartIntervalMS, boolean rmNeverStart) {
|
||||
super(context, dispatcher, healthChecker, metrics);
|
||||
this.rmStartIntervalMS = rmStartIntervalMS;
|
||||
this.rmNeverStart = rmNeverStart;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void serviceStart() throws Exception {
|
||||
//record the startup time
|
||||
super.serviceStart();
|
||||
}
|
||||
|
||||
private boolean isTriggered() {
|
||||
return triggered;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void stopRMProxy() {
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
private class MyNodeManager extends NodeManager {
|
||||
|
||||
private MyNodeStatusUpdater3 nodeStatusUpdater;
|
||||
|
@ -1309,6 +1339,59 @@ public class TestNodeStatusUpdater {
|
|||
+ "Message from ResourceManager: RM Shutting Down Node");
|
||||
}
|
||||
|
||||
@Test (timeout = 100000)
|
||||
public void testNMRMConnectionConf() throws Exception {
|
||||
final long delta = 50000;
|
||||
final long nmRmConnectionWaitMs = 100;
|
||||
final long nmRmRetryInterval = 100;
|
||||
final long connectionWaitMs = -1;
|
||||
final long connectionRetryIntervalMs = 1000;
|
||||
//Waiting for rmStartIntervalMS, RM will be started
|
||||
final long rmStartIntervalMS = 2*1000;
|
||||
conf.setLong(YarnConfiguration.NM_RESOURCEMANAGER_CONNECT_MAX_WAIT_MS,
|
||||
nmRmConnectionWaitMs);
|
||||
conf.setLong(
|
||||
YarnConfiguration.NM_RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS,
|
||||
nmRmRetryInterval);
|
||||
conf.setLong(YarnConfiguration.RESOURCEMANAGER_CONNECT_MAX_WAIT_MS,
|
||||
connectionWaitMs);
|
||||
conf.setLong(YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS,
|
||||
connectionRetryIntervalMs);
|
||||
conf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY,
|
||||
1);
|
||||
//Test NM try to connect to RM Several times, but finally fail
|
||||
NodeManagerWithCustomNodeStatusUpdater nmWithUpdater;
|
||||
nm = nmWithUpdater = new NodeManagerWithCustomNodeStatusUpdater() {
|
||||
@Override
|
||||
protected NodeStatusUpdater createUpdater(Context context,
|
||||
Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
|
||||
NodeStatusUpdater nodeStatusUpdater = new MyNodeStatusUpdater6(
|
||||
context, dispatcher, healthChecker, metrics,
|
||||
rmStartIntervalMS, true);
|
||||
return nodeStatusUpdater;
|
||||
}
|
||||
};
|
||||
nm.init(conf);
|
||||
long waitStartTime = System.currentTimeMillis();
|
||||
try {
|
||||
nm.start();
|
||||
Assert.fail("NM should have failed to start due to RM connect failure");
|
||||
} catch(Exception e) {
|
||||
long t = System.currentTimeMillis();
|
||||
long duration = t - waitStartTime;
|
||||
boolean waitTimeValid = (duration >= nmRmConnectionWaitMs) &&
|
||||
(duration < (connectionWaitMs + delta));
|
||||
|
||||
if(!waitTimeValid) {
|
||||
// throw exception if NM doesn't retry long enough
|
||||
throw new Exception("NM should have tried re-connecting to RM during " +
|
||||
"period of at least " + connectionWaitMs + " ms, but " +
|
||||
"stopped retrying within " + (connectionWaitMs + delta) +
|
||||
" ms: " + e, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test (timeout = 150000)
|
||||
public void testNMConnectionToRM() throws Exception {
|
||||
final long delta = 50000;
|
||||
|
|
Loading…
Reference in New Issue