YARN-378. Fix RM to make the AM max attempts/retries to be configurable per application by clients. Contributed by Zhijie Shen.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1460895 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2013-03-25 21:11:33 +00:00
parent 1acb805058
commit 0757364014
20 changed files with 205 additions and 27 deletions

View File

@ -133,6 +133,9 @@ Release 2.0.5-beta - UNRELEASED
YARN-71. Fix the NodeManager to clean up local-dirs on restart.
(Xuan Gong via sseth)
YARN-378. Fix RM to make the AM max attempts/retries to be configurable
per application by clients. (Zhijie Shen via vinodkv)
Release 2.0.4-alpha - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -92,6 +92,12 @@ public interface ApplicationConstants {
public static final String STDOUT = "stdout";
/**
* The environment variable for MAX_APP_ATTEMPTS. Set in AppMaster environment
* only
*/
public static final String MAX_APP_ATTEMPTS_ENV = "MAX_APP_ATTEMPTS";
/**
* Environment for Applications.
*

View File

@ -189,4 +189,22 @@ public interface ApplicationSubmissionContext {
@LimitedPrivate("mapreduce")
@Unstable
public void setCancelTokensWhenComplete(boolean cancel);
/**
* @return the number of max attempts of the application to be submitted
*/
@Public
@Unstable
public int getMaxAppAttempts();
/**
* Set the number of max attempts of the application to be submitted. WARNING:
* it should be no larger than the global number of max attempts in the Yarn
* configuration.
* @param maxAppAttempts the number of max attempts of the application
* to be submitted.
*/
@Public
@Unstable
public void setMaxAppAttempts(int maxAppAttempts);
}

View File

@ -232,6 +232,18 @@ public void setCancelTokensWhenComplete(boolean cancel) {
builder.setCancelTokensWhenComplete(cancel);
}
@Override
public int getMaxAppAttempts() {
ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder;
return p.getMaxAppAttempts();
}
@Override
public void setMaxAppAttempts(int maxAppAttempts) {
maybeInitBuilder();
builder.setMaxAppAttempts(maxAppAttempts);
}
private PriorityPBImpl convertFromProtoFormat(PriorityProto p) {
return new PriorityPBImpl(p);
}

View File

@ -219,6 +219,7 @@ message ApplicationSubmissionContextProto {
optional ContainerLaunchContextProto am_container_spec = 6;
optional bool cancel_tokens_when_complete = 7 [default = true];
optional bool unmanaged_am = 8 [default = false];
optional int32 maxAppAttempts = 9 [default = 0];
}
enum ApplicationAccessTypeProto {

View File

@ -180,10 +180,13 @@ public class YarnConfiguration extends Configuration {
RM_PREFIX + "admin.client.thread-count";
public static final int DEFAULT_RM_ADMIN_CLIENT_THREAD_COUNT = 1;
/** The maximum number of application master retries.*/
public static final String RM_AM_MAX_RETRIES =
RM_PREFIX + "am.max-retries";
public static final int DEFAULT_RM_AM_MAX_RETRIES = 1;
/**
* The maximum number of application attempts.
* It's a global setting for all application masters.
*/
public static final String RM_AM_MAX_ATTEMPTS =
RM_PREFIX + "am.max-attempts";
public static final int DEFAULT_RM_AM_MAX_ATTEMPTS = 1;
/** The keytab for the resource manager.*/
public static final String RM_KEYTAB =

View File

@ -135,8 +135,12 @@
</property>
<property>
<description>The maximum number of application master retries.</description>
<name>yarn.resourcemanager.am.max-retries</name>
<description>The maximum number of application attempts. It's a global
setting for all application masters. Each application master can specify
its individual maximum number of application attempts via the API, but the
individual number cannot be more than the global upper bound. If it is,
the resourcemanager will override it.</description>
<name>yarn.resourcemanager.am.max-attempts</name>
<value>1</value>
</property>

View File

@ -139,6 +139,8 @@ public RMContext getRMContext() {
@Override
public synchronized void init(Configuration conf) {
validateConfigs(conf);
this.conf = conf;
this.conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true);
@ -325,6 +327,15 @@ protected RMAppManager createRMAppManager() {
this.applicationACLsManager, this.conf);
}
protected static void validateConfigs(Configuration conf) {
int globalMaxAppAttempts = conf.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS);
if (globalMaxAppAttempts <= 0) {
throw new YarnException(
"The global max attempts should be a positive integer.");
}
}
@Private
public static class SchedulerEventDispatcher extends AbstractService
implements EventHandler<SchedulerEvent> {

View File

@ -197,6 +197,9 @@ private void setupTokensAndEnv(
String.valueOf(rmContext.getRMApps()
.get(applicationId)
.getSubmitTime()));
environment.put(ApplicationConstants.MAX_APP_ATTEMPTS_ENV,
String.valueOf(rmContext.getRMApps().get(
applicationId).getMaxAppAttempts()));
if (UserGroupInformation.isSecurityEnabled()) {
// TODO: Security enabled/disabled info should come from RM.

View File

@ -173,4 +173,10 @@ public interface RMApp extends EventHandler<RMAppEvent> {
* {@link FinishApplicationMasterRequest#setFinishApplicationStatus(FinalApplicationStatus)}.
*/
FinalApplicationStatus getFinalApplicationStatus();
/**
* The number of max attempts of the application.
* @return the number of max attempts of the application.
*/
int getMaxAppAttempts();
}

View File

@ -87,7 +87,7 @@ public class RMAppImpl implements RMApp, Recoverable {
private final YarnScheduler scheduler;
private final ApplicationMasterService masterService;
private final StringBuilder diagnostics = new StringBuilder();
private final int maxRetries;
private final int maxAppAttempts;
private final ReadLock readLock;
private final WriteLock writeLock;
private final Map<ApplicationAttemptId, RMAppAttempt> attempts
@ -231,8 +231,19 @@ public RMAppImpl(ApplicationId applicationId, RMContext rmContext,
this.submitTime = submitTime;
this.startTime = System.currentTimeMillis();
this.maxRetries = conf.getInt(YarnConfiguration.RM_AM_MAX_RETRIES,
YarnConfiguration.DEFAULT_RM_AM_MAX_RETRIES);
int globalMaxAppAttempts = conf.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS);
int individualMaxAppAttempts = submissionContext.getMaxAppAttempts();
if (individualMaxAppAttempts <= 0 ||
individualMaxAppAttempts > globalMaxAppAttempts) {
this.maxAppAttempts = globalMaxAppAttempts;
LOG.warn("The specific max attempts: " + individualMaxAppAttempts
+ " for application: " + applicationId.getId()
+ " is invalid, because it is out of the range [1, "
+ globalMaxAppAttempts + "]. Use the global max attempts instead.");
} else {
this.maxAppAttempts = individualMaxAppAttempts;
}
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
this.readLock = lock.readLock();
@ -493,6 +504,11 @@ public StringBuilder getDiagnostics() {
}
}
@Override
public int getMaxAppAttempts() {
return this.maxAppAttempts;
}
@Override
public void handle(RMAppEvent event) {
@ -669,10 +685,10 @@ public RMAppState transition(RMAppImpl app, RMAppEvent event) {
msg = "Unmanaged application " + app.getApplicationId()
+ " failed due to " + failedEvent.getDiagnostics()
+ ". Failing the application.";
} else if (app.attempts.size() >= app.maxRetries) {
} else if (app.attempts.size() >= app.maxAppAttempts) {
retryApp = false;
msg = "Application " + app.getApplicationId() + " failed "
+ app.maxRetries + " times due to " + failedEvent.getDiagnostics()
+ app.maxAppAttempts + " times due to " + failedEvent.getDiagnostics()
+ ". Failing the application.";
}

View File

@ -36,7 +36,7 @@
/**
* Interface to an Application Attempt in the Resource Manager.
* A {@link RMApp} can have multiple app attempts based on
* {@link YarnConfiguration#RM_AM_MAX_RETRIES}. For specific
* {@link YarnConfiguration#RM_AM_MAX_ATTEMPTS}. For specific
* implementation take a look at {@link RMAppAttemptImpl}.
*/
public interface RMAppAttempt extends EventHandler<RMAppAttemptEvent> {

View File

@ -377,7 +377,61 @@ public void testRMAppSubmit() throws Exception {
((Service)rmContext.getDispatcher()).stop();
}
@Test
@Test (timeout = 30000)
public void testRMAppSubmitMaxAppAttempts() throws Exception {
int[] globalMaxAppAttempts = new int[] { 10, 1 };
int[][] individualMaxAppAttempts = new int[][]{
new int[]{ 9, 10, 11, 0 },
new int[]{ 1, 10, 0, -1 }};
int[][] expectedNums = new int[][]{
new int[]{ 9, 10, 10, 10 },
new int[]{ 1, 1, 1, 1 }};
for (int i = 0; i < globalMaxAppAttempts.length; ++i) {
for (int j = 0; j < individualMaxAppAttempts.length; ++j) {
long now = System.currentTimeMillis();
RMContext rmContext = mockRMContext(0, now - 10);
ResourceScheduler scheduler = new CapacityScheduler();
Configuration conf = new Configuration();
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, globalMaxAppAttempts[i]);
ApplicationMasterService masterService =
new ApplicationMasterService(rmContext, scheduler);
TestRMAppManager appMonitor = new TestRMAppManager(rmContext,
new ClientToAMTokenSecretManagerInRM(), scheduler, masterService,
new ApplicationACLsManager(conf), conf);
RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
ApplicationSubmissionContext context =
recordFactory.newRecordInstance(ApplicationSubmissionContext.class);
ContainerLaunchContext amContainer = recordFactory
.newRecordInstance(ContainerLaunchContext.class);
amContainer.setApplicationACLs(new HashMap<ApplicationAccessType, String>());
context.setAMContainerSpec(amContainer);
setupDispatcher(rmContext, conf);
ApplicationId appID = MockApps.newAppID(1);
context.setApplicationId(appID);
if (individualMaxAppAttempts[i][j] != 0) {
context.setMaxAppAttempts(individualMaxAppAttempts[i][j]);
}
appMonitor.submitApplication(context);
RMApp app = rmContext.getRMApps().get(appID);
Assert.assertEquals("max application attempts doesn't match",
expectedNums[i][j], app.getMaxAppAttempts());
// wait for event to be processed
int timeoutSecs = 0;
while ((getAppEventType() == RMAppEventType.KILL) &&
timeoutSecs++ < 20) {
Thread.sleep(1000);
}
setAppEventType(RMAppEventType.KILL);
((Service)rmContext.getDispatcher()).stop();
}
}
}
@Test (timeout = 3000)
public void testRMAppSubmitWithQueueAndName() throws Exception {
long now = System.currentTimeMillis();

View File

@ -33,6 +33,7 @@
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
@ -60,6 +61,7 @@ private static final class MyContainerManagerImpl implements
int nmPortAtContainerManager;
int nmHttpPortAtContainerManager;
long submitTimeAtContainerManager;
int maxAppAttempts;
@Override
public StartContainerResponse
@ -82,7 +84,8 @@ private static final class MyContainerManagerImpl implements
Integer.parseInt(env.get(ApplicationConstants.NM_HTTP_PORT_ENV));
submitTimeAtContainerManager =
Long.parseLong(env.get(ApplicationConstants.APP_SUBMIT_TIME_ENV));
maxAppAttempts =
Integer.parseInt(env.get(ApplicationConstants.MAX_APP_ATTEMPTS_ENV));
return null;
}
@ -139,6 +142,8 @@ public void testAMLaunchAndCleanup() throws Exception {
containerManager.nmPortAtContainerManager);
Assert.assertEquals(nm1.getHttpPort(),
containerManager.nmHttpPortAtContainerManager);
Assert.assertEquals(YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS,
containerManager.maxAppAttempts);
MockAM am = new MockAM(rm.getRMContext(), rm
.getApplicationMasterService(), appAttemptId);

View File

@ -374,8 +374,10 @@ private ApplicationId getApplicationId(int id) {
private RMAppImpl getRMApp(RMContext rmContext, YarnScheduler yarnScheduler,
ApplicationId applicationId3, YarnConfiguration config, String queueName) {
ApplicationSubmissionContext asContext = mock(ApplicationSubmissionContext.class);
when(asContext.getMaxAppAttempts()).thenReturn(1);
return new RMAppImpl(applicationId3, rmContext, config, null, null,
queueName, null, yarnScheduler, null , System
queueName, asContext, yarnScheduler, null , System
.currentTimeMillis());
}
}

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.server.resourcemanager;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.Collection;
@ -27,6 +28,7 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.records.NodeHealthStatus;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
@ -183,4 +185,16 @@ private void checkResourceUsage(
}
}
@Test (timeout = 30000)
public void testResourceManagerInitConfigValidation() throws Exception {
Configuration conf = new YarnConfiguration();
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, -1);
try {
resourceManager.init(conf);
fail("Exception is expected because the global max attempts is negative.");
} catch (YarnException e) {
// Exception is expected.
}
}
}

View File

@ -216,6 +216,10 @@ public String getTrackingUrl() {
throw new UnsupportedOperationException("Not supported yet.");
}
@Override
public int getMaxAppAttempts() {
throw new UnsupportedOperationException("Not supported yet.");
}
@Override
public ApplicationReport createAndGetApplicationReport(boolean allowAccess) {
throw new UnsupportedOperationException("Not supported yet.");
}
@ -248,6 +252,7 @@ public static RMApp newApplication(int i) {
final long finish = 234567 + i * 1000;
RMAppState[] allStates = RMAppState.values();
final RMAppState state = allStates[i % allStates.length];
final int maxAppAttempts = i % 1000;
return new ApplicationBase() {
@Override
public ApplicationId getApplicationId() {
@ -302,6 +307,11 @@ public FinalApplicationStatus getFinalApplicationStatus() {
public RMAppAttempt getCurrentAppAttempt() {
return null;
}
@Override
public int getMaxAppAttempts() {
return maxAppAttempts;
}
};
}

View File

@ -48,6 +48,7 @@ public class MockRMApp implements RMApp {
String url = null;
StringBuilder diagnostics = new StringBuilder();
RMAppAttempt attempt;
int maxAppAttempts = 1;
public MockRMApp(int newid, long time, RMAppState newState) {
finish = time;
@ -186,6 +187,15 @@ public void setDiagnostics(String diag) {
this.diagnostics = new StringBuilder(diag);
}
@Override
public int getMaxAppAttempts() {
return maxAppAttempts;
}
public void setNumMaxRetries(int maxAppAttempts) {
this.maxAppAttempts = maxAppAttempts;
}
@Override
public void handle(RMAppEvent event) {
}

View File

@ -61,7 +61,7 @@ public class TestRMAppTransitions {
static final Log LOG = LogFactory.getLog(TestRMAppTransitions.class);
private RMContext rmContext;
private static int maxRetries = 4;
private static int maxAppAttempts = 4;
private static int appId = 1;
private DrainDispatcher rmDispatcher;
@ -167,8 +167,8 @@ protected RMApp createNewTestApp(ApplicationSubmissionContext submissionContext)
String name = MockApps.newAppName();
String queue = MockApps.newQueue();
Configuration conf = new YarnConfiguration();
// ensure max retries set to known value
conf.setInt(YarnConfiguration.RM_AM_MAX_RETRIES, maxRetries);
// ensure max application attempts set to known value
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, maxAppAttempts);
YarnScheduler scheduler = mock(YarnScheduler.class);
ApplicationMasterService masterService =
new ApplicationMasterService(rmContext, scheduler);
@ -437,7 +437,7 @@ public void testAppAcceptedFailed() throws IOException {
RMApp application = testCreateAppAccepted(null);
// ACCEPTED => ACCEPTED event RMAppEventType.RMAppEventType.ATTEMPT_FAILED
for (int i=1; i<maxRetries; i++) {
for (int i=1; i < maxAppAttempts; i++) {
RMAppEvent event =
new RMAppFailedAttemptEvent(application.getApplicationId(),
RMAppEventType.ATTEMPT_FAILED, "");
@ -452,7 +452,7 @@ public void testAppAcceptedFailed() throws IOException {
}
// ACCEPTED => FAILED event RMAppEventType.RMAppEventType.ATTEMPT_FAILED
// after max retries
// after max application attempts
String message = "Test fail";
RMAppEvent event =
new RMAppFailedAttemptEvent(application.getApplicationId(),
@ -500,7 +500,7 @@ public void testAppRunningFailed() throws IOException {
Assert.assertEquals(expectedAttemptId,
appAttempt.getAppAttemptId().getAttemptId());
// RUNNING => FAILED/RESTARTING event RMAppEventType.ATTEMPT_FAILED
for (int i=1; i<maxRetries; i++) {
for (int i=1; i<maxAppAttempts; i++) {
RMAppEvent event =
new RMAppFailedAttemptEvent(application.getApplicationId(),
RMAppEventType.ATTEMPT_FAILED, "");
@ -525,7 +525,7 @@ public void testAppRunningFailed() throws IOException {
}
// RUNNING => FAILED/RESTARTING event RMAppEventType.ATTEMPT_FAILED
// after max retries
// after max application attempts
RMAppEvent event =
new RMAppFailedAttemptEvent(application.getApplicationId(),
RMAppEventType.ATTEMPT_FAILED, "");

View File

@ -83,7 +83,7 @@ protected void configureServlets() {
bind(RMWebServices.class);
bind(GenericExceptionHandler.class);
Configuration conf = new Configuration();
conf.setInt(YarnConfiguration.RM_AM_MAX_RETRIES, 2);
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
conf.setClass(YarnConfiguration.RM_SCHEDULER, FifoScheduler.class,
ResourceScheduler.class);
rm = new MockRM(conf);
@ -871,9 +871,9 @@ public void testMultipleAppAttempts() throws JSONException, Exception {
MockNM amNodeManager = rm.registerNode("amNM:1234", 2048);
RMApp app1 = rm.submitApp(1024, "testwordcount", "user1");
amNodeManager.nodeHeartbeat(true);
int maxRetries = rm.getConfig().getInt(YarnConfiguration.RM_AM_MAX_RETRIES,
YarnConfiguration.DEFAULT_RM_AM_MAX_RETRIES);
int retriesLeft = maxRetries;
int maxAppAttempts = rm.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS);
int retriesLeft = maxAppAttempts;
while (--retriesLeft > 0) {
RMAppEvent event =
new RMAppFailedAttemptEvent(app1.getApplicationId(),
@ -882,7 +882,7 @@ public void testMultipleAppAttempts() throws JSONException, Exception {
rm.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
amNodeManager.nodeHeartbeat(true);
}
assertEquals("incorrect number of attempts", maxRetries,
assertEquals("incorrect number of attempts", maxAppAttempts,
app1.getAppAttempts().values().size());
testAppAttemptsHelper(app1.getApplicationId().toString(), app1,
MediaType.APPLICATION_JSON);