YARN-1577. Made UnmanagedAMLauncher do launchAM after the attempt reaches the LAUNCHED state. Contributed by Jian He.

svn merge --ignore-ancestry -c 1580164 ../../trunk/


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1580165 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Zhijie Shen 2014-03-22 04:37:46 +00:00
parent 21b5ec140a
commit ebd254117d
4 changed files with 90 additions and 32 deletions

View File

@ -543,6 +543,9 @@ Release 2.4.0 - UNRELEASED
YARN-1776. Fixed DelegationToken renewal to survive RM failover. (Zhijie
Shen via jianhe)
YARN-1577. Made UnmanagedAMLauncher do launchAM after the attempt reaches
the LAUNCHED state. (Jian He via zjshen)
Release 2.3.1 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -44,6 +44,7 @@
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
@ -51,6 +52,7 @@
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@ -77,7 +79,7 @@ public class UnmanagedAMLauncher {
private Configuration conf;
// Handle to talk to the Resource Manager/Applications Manager
private YarnClient rmClient;
protected YarnClient rmClient;
// Application master specific info to register a new Application with RM/ASM
private String appName = "";
@ -92,6 +94,7 @@ public class UnmanagedAMLauncher {
private volatile boolean amCompleted = false;
private static final long AM_STATE_WAIT_TIMEOUT_MS = 10000;
/**
* @param args
* Command line arguments
@ -173,12 +176,6 @@ public boolean init(String[] args) throws ParseException {
public void launchAM(ApplicationAttemptId attemptId)
throws IOException, YarnException {
ApplicationReport report =
rmClient.getApplicationReport(attemptId.getApplicationId());
if (report.getYarnApplicationState() != YarnApplicationState.ACCEPTED) {
throw new YarnException(
"Umanaged AM must be in ACCEPTED state before launching");
}
Credentials credentials = new Credentials();
Token<AMRMTokenIdentifier> token =
rmClient.getAMRMToken(attemptId.getApplicationId());
@ -339,20 +336,27 @@ public boolean run() throws IOException, YarnException {
LOG.info("Submitting application to ASM");
rmClient.submitApplication(appContext);
// Monitor the application to wait for launch state
ApplicationReport appReport = monitorApplication(appId,
EnumSet.of(YarnApplicationState.ACCEPTED));
ApplicationAttemptId attemptId = appReport.getCurrentApplicationAttemptId();
LOG.info("Launching application with id: " + attemptId);
// launch AM
launchAM(attemptId);
// Monitor the application for end state
appReport = monitorApplication(appId, EnumSet.of(
ApplicationReport appReport =
monitorApplication(appId, EnumSet.of(YarnApplicationState.ACCEPTED,
YarnApplicationState.KILLED, YarnApplicationState.FAILED,
YarnApplicationState.FINISHED));
if (appReport.getYarnApplicationState() == YarnApplicationState.ACCEPTED) {
// Monitor the application attempt to wait for launch state
ApplicationAttemptReport attemptReport =
monitorCurrentAppAttempt(appId,
YarnApplicationAttemptState.LAUNCHED);
ApplicationAttemptId attemptId =
attemptReport.getApplicationAttemptId();
LOG.info("Launching AM with application attempt id " + attemptId);
// launch AM
launchAM(attemptId);
// Monitor the application for end state
appReport =
monitorApplication(appId, EnumSet.of(YarnApplicationState.KILLED,
YarnApplicationState.FAILED, YarnApplicationState.FINISHED));
}
YarnApplicationState appState = appReport.getYarnApplicationState();
FinalApplicationStatus appStatus = appReport.getFinalApplicationStatus();
@ -376,6 +380,43 @@ public boolean run() throws IOException, YarnException {
}
}
private ApplicationAttemptReport monitorCurrentAppAttempt(
ApplicationId appId, YarnApplicationAttemptState attemptState)
throws YarnException, IOException {
long startTime = System.currentTimeMillis();
ApplicationAttemptId attemptId = null;
while (true) {
if (attemptId == null) {
attemptId =
rmClient.getApplicationReport(appId)
.getCurrentApplicationAttemptId();
}
ApplicationAttemptReport attemptReport = null;
if (attemptId != null) {
attemptReport = rmClient.getApplicationAttemptReport(attemptId);
if (attemptState.equals(attemptReport.getYarnApplicationAttemptState())) {
return attemptReport;
}
}
LOG.info("Current attempt state of " + appId + " is " + (attemptReport == null
? " N/A " : attemptReport.getYarnApplicationAttemptState())
+ ", waiting for current attempt to reach " + attemptState);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
LOG.warn("Interrupted while waiting for current attempt of " + appId
+ " to reach " + attemptState);
}
if (System.currentTimeMillis() - startTime > AM_STATE_WAIT_TIMEOUT_MS) {
String errmsg =
"Timeout for waiting current attempt of " + appId + " to reach "
+ attemptState;
LOG.error(errmsg);
throw new RuntimeException(errmsg);
}
}
}
/**
* Monitor the submitted application for completion. Kill application if time
* expires.
@ -391,7 +432,6 @@ private ApplicationReport monitorApplication(ApplicationId appId,
IOException {
long foundAMCompletedTime = 0;
final int timeToWaitMS = 10000;
StringBuilder expectedFinalState = new StringBuilder();
boolean first = true;
for (YarnApplicationState state : finalState) {
@ -438,8 +478,8 @@ private ApplicationReport monitorApplication(ApplicationId appId,
if (foundAMCompletedTime == 0) {
foundAMCompletedTime = System.currentTimeMillis();
} else if ((System.currentTimeMillis() - foundAMCompletedTime)
> timeToWaitMS) {
LOG.warn("Waited " + timeToWaitMS/1000
> AM_STATE_WAIT_TIMEOUT_MS) {
LOG.warn("Waited " + AM_STATE_WAIT_TIMEOUT_MS/1000
+ " seconds after process completed for AppReport"
+ " to reach desired final state. Not waiting anymore."
+ "CurrentState = " + state

View File

@ -28,8 +28,6 @@
import java.io.OutputStream;
import java.net.URL;
import org.junit.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@ -38,11 +36,15 @@
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
import org.apache.hadoop.yarn.client.ClientRMProxy;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.MiniYARNCluster;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
@ -122,7 +124,7 @@ private static String getTestRuntimeClasspath() {
}
@Test(timeout=30000)
public void testDSShell() throws Exception {
public void testUMALauncher() throws Exception {
String classpath = getTestRuntimeClasspath();
String javaHome = System.getenv("JAVA_HOME");
if (javaHome == null) {
@ -141,8 +143,18 @@ public void testDSShell() throws Exception {
+ " success" };
LOG.info("Initializing Launcher");
UnmanagedAMLauncher launcher = new UnmanagedAMLauncher(new Configuration(
yarnCluster.getConfig()));
UnmanagedAMLauncher launcher =
new UnmanagedAMLauncher(new Configuration(yarnCluster.getConfig())) {
public void launchAM(ApplicationAttemptId attemptId)
throws IOException, YarnException {
YarnApplicationAttemptState attemptState =
rmClient.getApplicationAttemptReport(attemptId)
.getYarnApplicationAttemptState();
Assert.assertTrue(attemptState
.equals(YarnApplicationAttemptState.LAUNCHED));
super.launchAM(attemptId);
}
};
boolean initSuccess = launcher.init(args);
Assert.assertTrue(initSuccess);
LOG.info("Running Launcher");
@ -154,7 +166,7 @@ public void testDSShell() throws Exception {
}
@Test(timeout=30000)
public void testDSShellError() throws Exception {
public void testUMALauncherError() throws Exception {
String classpath = getTestRuntimeClasspath();
String javaHome = System.getenv("JAVA_HOME");
if (javaHome == null) {

View File

@ -1650,11 +1650,14 @@ public ApplicationAttemptReport createApplicationAttemptReport() {
this.readLock.lock();
ApplicationAttemptReport attemptReport = null;
try {
// AM container maybe not yet allocated. and also unmangedAM doesn't have
// am container.
ContainerId amId =
masterContainer == null ? null : masterContainer.getId();
attemptReport = ApplicationAttemptReport.newInstance(this
.getAppAttemptId(), this.getHost(), this.getRpcPort(), this
.getTrackingUrl(), this.getDiagnostics(), YarnApplicationAttemptState
.valueOf(this.getState().toString()), this.getMasterContainer()
.getId());
.valueOf(this.getState().toString()), amId);
} finally {
this.readLock.unlock();
}