YARN-10212. Create separate configuration for max global AM attempts. Contributed by Bilwa S T
(cherry picked from commit 57659422abbf6d9bf52e6e27fca775254bb77a56)
This commit is contained in:
parent
e4331a73c9
commit
e1dd78143b
|
@ -502,13 +502,20 @@ public class YarnConfiguration extends Configuration {
|
||||||
public static final int DEFAULT_RM_ADMIN_CLIENT_THREAD_COUNT = 1;
|
public static final int DEFAULT_RM_ADMIN_CLIENT_THREAD_COUNT = 1;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The maximum number of application attempts.
|
* The maximum number of application attempts for
|
||||||
* It's a global setting for all application masters.
|
* an application, if unset by user.
|
||||||
*/
|
*/
|
||||||
public static final String RM_AM_MAX_ATTEMPTS =
|
public static final String RM_AM_MAX_ATTEMPTS =
|
||||||
RM_PREFIX + "am.max-attempts";
|
RM_PREFIX + "am.max-attempts";
|
||||||
public static final int DEFAULT_RM_AM_MAX_ATTEMPTS = 2;
|
public static final int DEFAULT_RM_AM_MAX_ATTEMPTS = 2;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The maximum number of application attempts.
|
||||||
|
* It's a global setting for all application masters.
|
||||||
|
*/
|
||||||
|
public static final String GLOBAL_RM_AM_MAX_ATTEMPTS =
|
||||||
|
RM_PREFIX + "am.global.max-attempts";
|
||||||
|
|
||||||
/** The keytab for the resource manager.*/
|
/** The keytab for the resource manager.*/
|
||||||
public static final String RM_KEYTAB =
|
public static final String RM_KEYTAB =
|
||||||
RM_PREFIX + "keytab";
|
RM_PREFIX + "keytab";
|
||||||
|
|
|
@ -327,11 +327,10 @@
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
<description>The maximum number of application attempts. It's a global
|
<description>The default maximum number of application attempts, if unset by
|
||||||
setting for all application masters. Each application master can specify
|
the user. Each application master can specify its individual maximum number of application
|
||||||
its individual maximum number of application attempts via the API, but the
|
attempts via the API, but the individual number cannot be more than the global upper bound in
|
||||||
individual number cannot be more than the global upper bound. If it is,
|
yarn.resourcemanager.am.global.max-attempts. The default number is set to 2, to
|
||||||
the resourcemanager will override it. The default number is set to 2, to
|
|
||||||
allow at least one retry for AM.</description>
|
allow at least one retry for AM.</description>
|
||||||
<name>yarn.resourcemanager.am.max-attempts</name>
|
<name>yarn.resourcemanager.am.max-attempts</name>
|
||||||
<value>2</value>
|
<value>2</value>
|
||||||
|
@ -4534,4 +4533,18 @@
|
||||||
<name>yarn.webapp.enable-rest-app-submissions</name>
|
<name>yarn.webapp.enable-rest-app-submissions</name>
|
||||||
<value>true</value>
|
<value>true</value>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<description>
|
||||||
|
The maximum number of application attempts. It's a global
|
||||||
|
setting for all application masters. Each application master can specify
|
||||||
|
its individual maximum number of application attempts via the API, but the
|
||||||
|
individual number cannot be more than the global upper bound. If it is,
|
||||||
|
the resourcemanager will override it. The default number value is set to
|
||||||
|
yarn.resourcemanager.am.max-attempts.
|
||||||
|
</description>
|
||||||
|
<name>yarn.resourcemanager.am.global.max-attempts</name>
|
||||||
|
<value></value>
|
||||||
|
</property>
|
||||||
|
|
||||||
</configuration>
|
</configuration>
|
||||||
|
|
|
@ -613,12 +613,20 @@ public class ResourceManager extends CompositeService
|
||||||
// sanity check for configurations
|
// sanity check for configurations
|
||||||
protected static void validateConfigs(Configuration conf) {
|
protected static void validateConfigs(Configuration conf) {
|
||||||
// validate max-attempts
|
// validate max-attempts
|
||||||
int globalMaxAppAttempts =
|
int rmMaxAppAttempts = conf.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
|
||||||
conf.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
|
|
||||||
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS);
|
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS);
|
||||||
|
if (rmMaxAppAttempts <= 0) {
|
||||||
|
throw new YarnRuntimeException("Invalid rm am max attempts configuration"
|
||||||
|
+ ", " + YarnConfiguration.RM_AM_MAX_ATTEMPTS
|
||||||
|
+ "=" + rmMaxAppAttempts + ", it should be a positive integer.");
|
||||||
|
}
|
||||||
|
int globalMaxAppAttempts = conf.getInt(
|
||||||
|
YarnConfiguration.GLOBAL_RM_AM_MAX_ATTEMPTS,
|
||||||
|
conf.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
|
||||||
|
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS));
|
||||||
if (globalMaxAppAttempts <= 0) {
|
if (globalMaxAppAttempts <= 0) {
|
||||||
throw new YarnRuntimeException("Invalid global max attempts configuration"
|
throw new YarnRuntimeException("Invalid global max attempts configuration"
|
||||||
+ ", " + YarnConfiguration.RM_AM_MAX_ATTEMPTS
|
+ ", " + YarnConfiguration.GLOBAL_RM_AM_MAX_ATTEMPTS
|
||||||
+ "=" + globalMaxAppAttempts + ", it should be a positive integer.");
|
+ "=" + globalMaxAppAttempts + ", it should be a positive integer.");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -453,11 +453,20 @@ public class RMAppImpl implements RMApp, Recoverable {
|
||||||
this.applicationPriority = Priority.newInstance(0);
|
this.applicationPriority = Priority.newInstance(0);
|
||||||
}
|
}
|
||||||
|
|
||||||
int globalMaxAppAttempts = conf.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
|
int globalMaxAppAttempts = conf.getInt(
|
||||||
|
YarnConfiguration.GLOBAL_RM_AM_MAX_ATTEMPTS,
|
||||||
|
conf.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
|
||||||
|
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS));
|
||||||
|
int rmMaxAppAttempts = conf.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
|
||||||
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS);
|
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS);
|
||||||
int individualMaxAppAttempts = submissionContext.getMaxAppAttempts();
|
int individualMaxAppAttempts = submissionContext.getMaxAppAttempts();
|
||||||
if (individualMaxAppAttempts <= 0 ||
|
if (individualMaxAppAttempts <= 0) {
|
||||||
individualMaxAppAttempts > globalMaxAppAttempts) {
|
this.maxAppAttempts = rmMaxAppAttempts;
|
||||||
|
LOG.warn("The specific max attempts: " + individualMaxAppAttempts
|
||||||
|
+ " for application: " + applicationId.getId()
|
||||||
|
+ " is invalid, because it is less than or equal to zero."
|
||||||
|
+ " Use the rm max attempts instead.");
|
||||||
|
} else if (individualMaxAppAttempts > globalMaxAppAttempts) {
|
||||||
this.maxAppAttempts = globalMaxAppAttempts;
|
this.maxAppAttempts = globalMaxAppAttempts;
|
||||||
LOG.warn("The specific max attempts: " + individualMaxAppAttempts
|
LOG.warn("The specific max attempts: " + individualMaxAppAttempts
|
||||||
+ " for application: " + applicationId.getId()
|
+ " for application: " + applicationId.getId()
|
||||||
|
@ -1211,8 +1220,9 @@ public class RMAppImpl implements RMApp, Recoverable {
|
||||||
+ " failed due to " + failedEvent.getDiagnosticMsg()
|
+ " failed due to " + failedEvent.getDiagnosticMsg()
|
||||||
+ ". Failing the application.";
|
+ ". Failing the application.";
|
||||||
} else if (this.isNumAttemptsBeyondThreshold) {
|
} else if (this.isNumAttemptsBeyondThreshold) {
|
||||||
int globalLimit = conf.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
|
int globalLimit = conf.getInt(YarnConfiguration.GLOBAL_RM_AM_MAX_ATTEMPTS,
|
||||||
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS);
|
conf.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
|
||||||
|
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS));
|
||||||
msg = String.format(
|
msg = String.format(
|
||||||
"Application %s failed %d times%s%s due to %s. Failing the application.",
|
"Application %s failed %d times%s%s due to %s. Failing the application.",
|
||||||
getApplicationId(),
|
getApplicationId(),
|
||||||
|
|
|
@ -980,17 +980,20 @@ public class TestAppManager extends AppManagerTestBase{
|
||||||
@Test (timeout = 30000)
|
@Test (timeout = 30000)
|
||||||
public void testRMAppSubmitMaxAppAttempts() throws Exception {
|
public void testRMAppSubmitMaxAppAttempts() throws Exception {
|
||||||
int[] globalMaxAppAttempts = new int[] { 10, 1 };
|
int[] globalMaxAppAttempts = new int[] { 10, 1 };
|
||||||
|
int[] rmAmMaxAttempts = new int[] { 8, 1 };
|
||||||
int[][] individualMaxAppAttempts = new int[][]{
|
int[][] individualMaxAppAttempts = new int[][]{
|
||||||
new int[]{ 9, 10, 11, 0 },
|
new int[]{ 9, 10, 11, 0 },
|
||||||
new int[]{ 1, 10, 0, -1 }};
|
new int[]{ 1, 10, 0, -1 }};
|
||||||
int[][] expectedNums = new int[][]{
|
int[][] expectedNums = new int[][]{
|
||||||
new int[]{ 9, 10, 10, 10 },
|
new int[]{ 9, 10, 10, 8 },
|
||||||
new int[]{ 1, 1, 1, 1 }};
|
new int[]{ 1, 1, 1, 1 }};
|
||||||
for (int i = 0; i < globalMaxAppAttempts.length; ++i) {
|
for (int i = 0; i < globalMaxAppAttempts.length; ++i) {
|
||||||
for (int j = 0; j < individualMaxAppAttempts.length; ++j) {
|
for (int j = 0; j < individualMaxAppAttempts.length; ++j) {
|
||||||
ResourceScheduler scheduler = mockResourceScheduler();
|
ResourceScheduler scheduler = mockResourceScheduler();
|
||||||
Configuration conf = new Configuration();
|
Configuration conf = new Configuration();
|
||||||
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, globalMaxAppAttempts[i]);
|
conf.setInt(YarnConfiguration.GLOBAL_RM_AM_MAX_ATTEMPTS,
|
||||||
|
globalMaxAppAttempts[i]);
|
||||||
|
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, rmAmMaxAttempts[i]);
|
||||||
ApplicationMasterService masterService =
|
ApplicationMasterService masterService =
|
||||||
new ApplicationMasterService(rmContext, scheduler);
|
new ApplicationMasterService(rmContext, scheduler);
|
||||||
TestRMAppManager appMonitor = new TestRMAppManager(rmContext,
|
TestRMAppManager appMonitor = new TestRMAppManager(rmContext,
|
||||||
|
|
|
@ -237,7 +237,7 @@ public class TestResourceManager {
|
||||||
@Test (timeout = 30000)
|
@Test (timeout = 30000)
|
||||||
public void testResourceManagerInitConfigValidation() throws Exception {
|
public void testResourceManagerInitConfigValidation() throws Exception {
|
||||||
Configuration conf = new YarnConfiguration();
|
Configuration conf = new YarnConfiguration();
|
||||||
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, -1);
|
conf.setInt(YarnConfiguration.GLOBAL_RM_AM_MAX_ATTEMPTS, -1);
|
||||||
try {
|
try {
|
||||||
resourceManager = new MockRM(conf);
|
resourceManager = new MockRM(conf);
|
||||||
fail("Exception is expected because the global max attempts" +
|
fail("Exception is expected because the global max attempts" +
|
||||||
|
@ -247,6 +247,17 @@ public class TestResourceManager {
|
||||||
if (!e.getMessage().startsWith(
|
if (!e.getMessage().startsWith(
|
||||||
"Invalid global max attempts configuration")) throw e;
|
"Invalid global max attempts configuration")) throw e;
|
||||||
}
|
}
|
||||||
|
Configuration yarnConf = new YarnConfiguration();
|
||||||
|
yarnConf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, -1);
|
||||||
|
try {
|
||||||
|
resourceManager = new MockRM(yarnConf);
|
||||||
|
fail("Exception is expected because AM max attempts" +
|
||||||
|
" is negative.");
|
||||||
|
} catch (YarnRuntimeException e) {
|
||||||
|
// Exception is expected.
|
||||||
|
if (!e.getMessage().startsWith(
|
||||||
|
"Invalid rm am max attempts configuration")) throw e;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
Loading…
Reference in New Issue