YARN-378. Fix RM to make the AM max attempts/retries to be configurable per application by clients. Contributed by Zhijie Shen.

svn merge --ignore-ancestry -c 1460895 ../../trunk/


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1460897 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2013-03-25 21:14:42 +00:00
parent 6d784e9be0
commit 8ce818a836
20 changed files with 205 additions and 27 deletions

View File

@ -78,6 +78,9 @@ Release 2.0.5-beta - UNRELEASED
YARN-71. Fix the NodeManager to clean up local-dirs on restart.
(Xuan Gong via sseth)
YARN-378. Fix RM to make the AM max attempts/retries to be configurable
per application by clients. (Zhijie Shen via vinodkv)
Release 2.0.4-alpha - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -91,6 +91,12 @@ public interface ApplicationConstants {
public static final String STDOUT = "stdout";
/**
* The environment variable for MAX_APP_ATTEMPTS. Set in AppMaster environment
* only
*/
public static final String MAX_APP_ATTEMPTS_ENV = "MAX_APP_ATTEMPTS";
/**
* Environment for Applications.
*

View File

@ -189,4 +189,22 @@ public interface ApplicationSubmissionContext {
@LimitedPrivate("mapreduce")
@Unstable
public void setCancelTokensWhenComplete(boolean cancel);
/**
* @return the number of max attempts of the application to be submitted
*/
@Public
@Unstable
public int getMaxAppAttempts();
/**
* Set the number of max attempts of the application to be submitted. WARNING:
* it should be no larger than the global number of max attempts in the Yarn
* configuration.
* @param maxAppAttempts the number of max attempts of the application
* to be submitted.
*/
@Public
@Unstable
public void setMaxAppAttempts(int maxAppAttempts);
}

View File

@ -232,6 +232,18 @@ implements ApplicationSubmissionContext {
builder.setCancelTokensWhenComplete(cancel);
}
@Override
public int getMaxAppAttempts() {
ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder;
return p.getMaxAppAttempts();
}
@Override
public void setMaxAppAttempts(int maxAppAttempts) {
maybeInitBuilder();
builder.setMaxAppAttempts(maxAppAttempts);
}
private PriorityPBImpl convertFromProtoFormat(PriorityProto p) {
return new PriorityPBImpl(p);
}

View File

@ -219,6 +219,7 @@ message ApplicationSubmissionContextProto {
optional ContainerLaunchContextProto am_container_spec = 6;
optional bool cancel_tokens_when_complete = 7 [default = true];
optional bool unmanaged_am = 8 [default = false];
optional int32 maxAppAttempts = 9 [default = 0];
}
enum ApplicationAccessTypeProto {

View File

@ -180,10 +180,13 @@ public class YarnConfiguration extends Configuration {
RM_PREFIX + "admin.client.thread-count";
public static final int DEFAULT_RM_ADMIN_CLIENT_THREAD_COUNT = 1;
/** The maximum number of application master retries.*/
public static final String RM_AM_MAX_RETRIES =
RM_PREFIX + "am.max-retries";
public static final int DEFAULT_RM_AM_MAX_RETRIES = 1;
/**
* The maximum number of application attempts.
* It's a global setting for all application masters.
*/
public static final String RM_AM_MAX_ATTEMPTS =
RM_PREFIX + "am.max-attempts";
public static final int DEFAULT_RM_AM_MAX_ATTEMPTS = 1;
/** The keytab for the resource manager.*/
public static final String RM_KEYTAB =

View File

@ -135,8 +135,12 @@
</property>
<property>
<description>The maximum number of application master retries.</description>
<name>yarn.resourcemanager.am.max-retries</name>
<description>The maximum number of application attempts. It's a global
setting for all application masters. Each application master can specify
its individual maximum number of application attempts via the API, but the
individual number cannot be more than the global upper bound. If it is,
the resourcemanager will override it.</description>
<name>yarn.resourcemanager.am.max-attempts</name>
<value>1</value>
</property>

View File

@ -139,6 +139,8 @@ public class ResourceManager extends CompositeService implements Recoverable {
@Override
public synchronized void init(Configuration conf) {
validateConfigs(conf);
this.conf = conf;
this.conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true);
@ -325,6 +327,15 @@ public class ResourceManager extends CompositeService implements Recoverable {
this.applicationACLsManager, this.conf);
}
protected static void validateConfigs(Configuration conf) {
int globalMaxAppAttempts = conf.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS);
if (globalMaxAppAttempts <= 0) {
throw new YarnException(
"The global max attempts should be a positive integer.");
}
}
@Private
public static class SchedulerEventDispatcher extends AbstractService
implements EventHandler<SchedulerEvent> {

View File

@ -197,6 +197,9 @@ public class AMLauncher implements Runnable {
String.valueOf(rmContext.getRMApps()
.get(applicationId)
.getSubmitTime()));
environment.put(ApplicationConstants.MAX_APP_ATTEMPTS_ENV,
String.valueOf(rmContext.getRMApps().get(
applicationId).getMaxAppAttempts()));
if (UserGroupInformation.isSecurityEnabled()) {
// TODO: Security enabled/disabled info should come from RM.

View File

@ -173,4 +173,10 @@ public interface RMApp extends EventHandler<RMAppEvent> {
* {@link FinishApplicationMasterRequest#setFinishApplicationStatus(FinalApplicationStatus)}.
*/
FinalApplicationStatus getFinalApplicationStatus();
/**
* The number of max attempts of the application.
* @return the number of max attempts of the application.
*/
int getMaxAppAttempts();
}

View File

@ -87,7 +87,7 @@ public class RMAppImpl implements RMApp, Recoverable {
private final YarnScheduler scheduler;
private final ApplicationMasterService masterService;
private final StringBuilder diagnostics = new StringBuilder();
private final int maxRetries;
private final int maxAppAttempts;
private final ReadLock readLock;
private final WriteLock writeLock;
private final Map<ApplicationAttemptId, RMAppAttempt> attempts
@ -231,8 +231,19 @@ public class RMAppImpl implements RMApp, Recoverable {
this.submitTime = submitTime;
this.startTime = System.currentTimeMillis();
this.maxRetries = conf.getInt(YarnConfiguration.RM_AM_MAX_RETRIES,
YarnConfiguration.DEFAULT_RM_AM_MAX_RETRIES);
int globalMaxAppAttempts = conf.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS);
int individualMaxAppAttempts = submissionContext.getMaxAppAttempts();
if (individualMaxAppAttempts <= 0 ||
individualMaxAppAttempts > globalMaxAppAttempts) {
this.maxAppAttempts = globalMaxAppAttempts;
LOG.warn("The specific max attempts: " + individualMaxAppAttempts
+ " for application: " + applicationId.getId()
+ " is invalid, because it is out of the range [1, "
+ globalMaxAppAttempts + "]. Use the global max attempts instead.");
} else {
this.maxAppAttempts = individualMaxAppAttempts;
}
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
this.readLock = lock.readLock();
@ -493,6 +504,11 @@ public class RMAppImpl implements RMApp, Recoverable {
}
}
@Override
public int getMaxAppAttempts() {
return this.maxAppAttempts;
}
@Override
public void handle(RMAppEvent event) {
@ -669,10 +685,10 @@ public class RMAppImpl implements RMApp, Recoverable {
msg = "Unmanaged application " + app.getApplicationId()
+ " failed due to " + failedEvent.getDiagnostics()
+ ". Failing the application.";
} else if (app.attempts.size() >= app.maxRetries) {
} else if (app.attempts.size() >= app.maxAppAttempts) {
retryApp = false;
msg = "Application " + app.getApplicationId() + " failed "
+ app.maxRetries + " times due to " + failedEvent.getDiagnostics()
+ app.maxAppAttempts + " times due to " + failedEvent.getDiagnostics()
+ ". Failing the application.";
}

View File

@ -36,7 +36,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
/**
* Interface to an Application Attempt in the Resource Manager.
* A {@link RMApp} can have multiple app attempts based on
* {@link YarnConfiguration#RM_AM_MAX_RETRIES}. For specific
* {@link YarnConfiguration#RM_AM_MAX_ATTEMPTS}. For specific
* implementation take a look at {@link RMAppAttemptImpl}.
*/
public interface RMAppAttempt extends EventHandler<RMAppAttemptEvent> {

View File

@ -377,7 +377,61 @@ public class TestAppManager{
((Service)rmContext.getDispatcher()).stop();
}
@Test
@Test (timeout = 30000)
public void testRMAppSubmitMaxAppAttempts() throws Exception {
int[] globalMaxAppAttempts = new int[] { 10, 1 };
int[][] individualMaxAppAttempts = new int[][]{
new int[]{ 9, 10, 11, 0 },
new int[]{ 1, 10, 0, -1 }};
int[][] expectedNums = new int[][]{
new int[]{ 9, 10, 10, 10 },
new int[]{ 1, 1, 1, 1 }};
for (int i = 0; i < globalMaxAppAttempts.length; ++i) {
for (int j = 0; j < individualMaxAppAttempts.length; ++j) {
long now = System.currentTimeMillis();
RMContext rmContext = mockRMContext(0, now - 10);
ResourceScheduler scheduler = new CapacityScheduler();
Configuration conf = new Configuration();
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, globalMaxAppAttempts[i]);
ApplicationMasterService masterService =
new ApplicationMasterService(rmContext, scheduler);
TestRMAppManager appMonitor = new TestRMAppManager(rmContext,
new ClientToAMTokenSecretManagerInRM(), scheduler, masterService,
new ApplicationACLsManager(conf), conf);
RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
ApplicationSubmissionContext context =
recordFactory.newRecordInstance(ApplicationSubmissionContext.class);
ContainerLaunchContext amContainer = recordFactory
.newRecordInstance(ContainerLaunchContext.class);
amContainer.setApplicationACLs(new HashMap<ApplicationAccessType, String>());
context.setAMContainerSpec(amContainer);
setupDispatcher(rmContext, conf);
ApplicationId appID = MockApps.newAppID(1);
context.setApplicationId(appID);
if (individualMaxAppAttempts[i][j] != 0) {
context.setMaxAppAttempts(individualMaxAppAttempts[i][j]);
}
appMonitor.submitApplication(context);
RMApp app = rmContext.getRMApps().get(appID);
Assert.assertEquals("max application attempts doesn't match",
expectedNums[i][j], app.getMaxAppAttempts());
// wait for event to be processed
int timeoutSecs = 0;
while ((getAppEventType() == RMAppEventType.KILL) &&
timeoutSecs++ < 20) {
Thread.sleep(1000);
}
setAppEventType(RMAppEventType.KILL);
((Service)rmContext.getDispatcher()).stop();
}
}
}
@Test (timeout = 3000)
public void testRMAppSubmitWithQueueAndName() throws Exception {
long now = System.currentTimeMillis();

View File

@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.StopContainerResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
@ -60,6 +61,7 @@ public class TestApplicationMasterLauncher {
int nmPortAtContainerManager;
int nmHttpPortAtContainerManager;
long submitTimeAtContainerManager;
int maxAppAttempts;
@Override
public StartContainerResponse
@ -82,7 +84,8 @@ public class TestApplicationMasterLauncher {
Integer.parseInt(env.get(ApplicationConstants.NM_HTTP_PORT_ENV));
submitTimeAtContainerManager =
Long.parseLong(env.get(ApplicationConstants.APP_SUBMIT_TIME_ENV));
maxAppAttempts =
Integer.parseInt(env.get(ApplicationConstants.MAX_APP_ATTEMPTS_ENV));
return null;
}
@ -139,6 +142,8 @@ public class TestApplicationMasterLauncher {
containerManager.nmPortAtContainerManager);
Assert.assertEquals(nm1.getHttpPort(),
containerManager.nmHttpPortAtContainerManager);
Assert.assertEquals(YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS,
containerManager.maxAppAttempts);
MockAM am = new MockAM(rm.getRMContext(), rm
.getApplicationMasterService(), appAttemptId);

View File

@ -374,8 +374,10 @@ public class TestClientRMService {
private RMAppImpl getRMApp(RMContext rmContext, YarnScheduler yarnScheduler,
ApplicationId applicationId3, YarnConfiguration config, String queueName) {
ApplicationSubmissionContext asContext = mock(ApplicationSubmissionContext.class);
when(asContext.getMaxAppAttempts()).thenReturn(1);
return new RMAppImpl(applicationId3, rmContext, config, null, null,
queueName, null, yarnScheduler, null , System
queueName, asContext, yarnScheduler, null , System
.currentTimeMillis());
}
}

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.server.resourcemanager;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.Collection;
@ -27,6 +28,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.records.NodeHealthStatus;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
@ -183,4 +185,16 @@ public class TestResourceManager {
}
}
@Test (timeout = 30000)
public void testResourceManagerInitConfigValidation() throws Exception {
Configuration conf = new YarnConfiguration();
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, -1);
try {
resourceManager.init(conf);
fail("Exception is expected because the global max attempts is negative.");
} catch (YarnException e) {
// Exception is expected.
}
}
}

View File

@ -216,6 +216,10 @@ public abstract class MockAsm extends MockApps {
throw new UnsupportedOperationException("Not supported yet.");
}
@Override
public int getMaxAppAttempts() {
throw new UnsupportedOperationException("Not supported yet.");
}
@Override
public ApplicationReport createAndGetApplicationReport(boolean allowAccess) {
throw new UnsupportedOperationException("Not supported yet.");
}
@ -248,6 +252,7 @@ public abstract class MockAsm extends MockApps {
final long finish = 234567 + i * 1000;
RMAppState[] allStates = RMAppState.values();
final RMAppState state = allStates[i % allStates.length];
final int maxAppAttempts = i % 1000;
return new ApplicationBase() {
@Override
public ApplicationId getApplicationId() {
@ -302,6 +307,11 @@ public abstract class MockAsm extends MockApps {
public RMAppAttempt getCurrentAppAttempt() {
return null;
}
@Override
public int getMaxAppAttempts() {
return maxAppAttempts;
}
};
}

View File

@ -48,6 +48,7 @@ public class MockRMApp implements RMApp {
String url = null;
StringBuilder diagnostics = new StringBuilder();
RMAppAttempt attempt;
int maxAppAttempts = 1;
public MockRMApp(int newid, long time, RMAppState newState) {
finish = time;
@ -186,6 +187,15 @@ public class MockRMApp implements RMApp {
this.diagnostics = new StringBuilder(diag);
}
@Override
public int getMaxAppAttempts() {
return maxAppAttempts;
}
public void setNumMaxRetries(int maxAppAttempts) {
this.maxAppAttempts = maxAppAttempts;
}
@Override
public void handle(RMAppEvent event) {
}

View File

@ -61,7 +61,7 @@ public class TestRMAppTransitions {
static final Log LOG = LogFactory.getLog(TestRMAppTransitions.class);
private RMContext rmContext;
private static int maxRetries = 4;
private static int maxAppAttempts = 4;
private static int appId = 1;
private DrainDispatcher rmDispatcher;
@ -167,8 +167,8 @@ public class TestRMAppTransitions {
String name = MockApps.newAppName();
String queue = MockApps.newQueue();
Configuration conf = new YarnConfiguration();
// ensure max retries set to known value
conf.setInt(YarnConfiguration.RM_AM_MAX_RETRIES, maxRetries);
// ensure max application attempts set to known value
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, maxAppAttempts);
YarnScheduler scheduler = mock(YarnScheduler.class);
ApplicationMasterService masterService =
new ApplicationMasterService(rmContext, scheduler);
@ -437,7 +437,7 @@ public class TestRMAppTransitions {
RMApp application = testCreateAppAccepted(null);
// ACCEPTED => ACCEPTED event RMAppEventType.RMAppEventType.ATTEMPT_FAILED
for (int i=1; i<maxRetries; i++) {
for (int i=1; i < maxAppAttempts; i++) {
RMAppEvent event =
new RMAppFailedAttemptEvent(application.getApplicationId(),
RMAppEventType.ATTEMPT_FAILED, "");
@ -452,7 +452,7 @@ public class TestRMAppTransitions {
}
// ACCEPTED => FAILED event RMAppEventType.RMAppEventType.ATTEMPT_FAILED
// after max retries
// after max application attempts
String message = "Test fail";
RMAppEvent event =
new RMAppFailedAttemptEvent(application.getApplicationId(),
@ -500,7 +500,7 @@ public class TestRMAppTransitions {
Assert.assertEquals(expectedAttemptId,
appAttempt.getAppAttemptId().getAttemptId());
// RUNNING => FAILED/RESTARTING event RMAppEventType.ATTEMPT_FAILED
for (int i=1; i<maxRetries; i++) {
for (int i=1; i<maxAppAttempts; i++) {
RMAppEvent event =
new RMAppFailedAttemptEvent(application.getApplicationId(),
RMAppEventType.ATTEMPT_FAILED, "");
@ -525,7 +525,7 @@ public class TestRMAppTransitions {
}
// RUNNING => FAILED/RESTARTING event RMAppEventType.ATTEMPT_FAILED
// after max retries
// after max application attempts
RMAppEvent event =
new RMAppFailedAttemptEvent(application.getApplicationId(),
RMAppEventType.ATTEMPT_FAILED, "");

View File

@ -83,7 +83,7 @@ public class TestRMWebServicesApps extends JerseyTest {
bind(RMWebServices.class);
bind(GenericExceptionHandler.class);
Configuration conf = new Configuration();
conf.setInt(YarnConfiguration.RM_AM_MAX_RETRIES, 2);
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
conf.setClass(YarnConfiguration.RM_SCHEDULER, FifoScheduler.class,
ResourceScheduler.class);
rm = new MockRM(conf);
@ -871,9 +871,9 @@ public class TestRMWebServicesApps extends JerseyTest {
MockNM amNodeManager = rm.registerNode("amNM:1234", 2048);
RMApp app1 = rm.submitApp(1024, "testwordcount", "user1");
amNodeManager.nodeHeartbeat(true);
int maxRetries = rm.getConfig().getInt(YarnConfiguration.RM_AM_MAX_RETRIES,
YarnConfiguration.DEFAULT_RM_AM_MAX_RETRIES);
int retriesLeft = maxRetries;
int maxAppAttempts = rm.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS);
int retriesLeft = maxAppAttempts;
while (--retriesLeft > 0) {
RMAppEvent event =
new RMAppFailedAttemptEvent(app1.getApplicationId(),
@ -882,7 +882,7 @@ public class TestRMWebServicesApps extends JerseyTest {
rm.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
amNodeManager.nodeHeartbeat(true);
}
assertEquals("incorrect number of attempts", maxRetries,
assertEquals("incorrect number of attempts", maxAppAttempts,
app1.getAppAttempts().values().size());
testAppAttemptsHelper(app1.getApplicationId().toString(), app1,
MediaType.APPLICATION_JSON);