Merge -c 1460954 from trunk to branch-2 for YARN-498. Unmanaged AM launcher does not set various constants in env for an AM, also does not handle failed AMs properly (Hitesh Shah via bikas)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1460955 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
7410551629
commit
ab3c4b7e68
|
@ -81,6 +81,9 @@ Release 2.0.5-beta - UNRELEASED
|
||||||
YARN-378. Fix RM to make the AM max attempts/retries to be configurable
|
YARN-378. Fix RM to make the AM max attempts/retries to be configurable
|
||||||
per application by clients. (Zhijie Shen via vinodkv)
|
per application by clients. (Zhijie Shen via vinodkv)
|
||||||
|
|
||||||
|
YARN-498. Unmanaged AM launcher does not set various constants in env for
|
||||||
|
an AM, also does not handle failed AMs properly. (Hitesh Shah via bikas)
|
||||||
|
|
||||||
Release 2.0.4-alpha - UNRELEASED
|
Release 2.0.4-alpha - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -319,10 +319,7 @@ public class ApplicationMaster {
|
||||||
|
|
||||||
Map<String, String> envs = System.getenv();
|
Map<String, String> envs = System.getenv();
|
||||||
|
|
||||||
if (envs.containsKey(ApplicationConstants.AM_APP_ATTEMPT_ID_ENV)) {
|
if (!envs.containsKey(ApplicationConstants.AM_CONTAINER_ID_ENV)) {
|
||||||
appAttemptID = ConverterUtils.toApplicationAttemptId(envs
|
|
||||||
.get(ApplicationConstants.AM_APP_ATTEMPT_ID_ENV));
|
|
||||||
} else if (!envs.containsKey(ApplicationConstants.AM_CONTAINER_ID_ENV)) {
|
|
||||||
if (cliParser.hasOption("app_attempt_id")) {
|
if (cliParser.hasOption("app_attempt_id")) {
|
||||||
String appIdStr = cliParser.getOptionValue("app_attempt_id", "");
|
String appIdStr = cliParser.getOptionValue("app_attempt_id", "");
|
||||||
appAttemptID = ConverterUtils.toApplicationAttemptId(appIdStr);
|
appAttemptID = ConverterUtils.toApplicationAttemptId(appIdStr);
|
||||||
|
@ -336,6 +333,23 @@ public class ApplicationMaster {
|
||||||
appAttemptID = containerId.getApplicationAttemptId();
|
appAttemptID = containerId.getApplicationAttemptId();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (!envs.containsKey(ApplicationConstants.APP_SUBMIT_TIME_ENV)) {
|
||||||
|
throw new RuntimeException(ApplicationConstants.APP_SUBMIT_TIME_ENV
|
||||||
|
+ " not set in the environment");
|
||||||
|
}
|
||||||
|
if (!envs.containsKey(ApplicationConstants.NM_HOST_ENV)) {
|
||||||
|
throw new RuntimeException(ApplicationConstants.NM_HOST_ENV
|
||||||
|
+ " not set in the environment");
|
||||||
|
}
|
||||||
|
if (!envs.containsKey(ApplicationConstants.NM_HTTP_PORT_ENV)) {
|
||||||
|
throw new RuntimeException(ApplicationConstants.NM_HTTP_PORT_ENV
|
||||||
|
+ " not set in the environment");
|
||||||
|
}
|
||||||
|
if (!envs.containsKey(ApplicationConstants.NM_PORT_ENV)) {
|
||||||
|
throw new RuntimeException(ApplicationConstants.NM_PORT_ENV
|
||||||
|
+ " not set in the environment");
|
||||||
|
}
|
||||||
|
|
||||||
LOG.info("Application master for app" + ", appId="
|
LOG.info("Application master for app" + ", appId="
|
||||||
+ appAttemptID.getApplicationId().getId() + ", clustertimestamp="
|
+ appAttemptID.getApplicationId().getId() + ", clustertimestamp="
|
||||||
+ appAttemptID.getApplicationId().getClusterTimestamp()
|
+ appAttemptID.getApplicationId().getClusterTimestamp()
|
||||||
|
|
|
@ -22,6 +22,7 @@ import java.io.BufferedReader;
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStreamReader;
|
import java.io.InputStreamReader;
|
||||||
|
import java.net.InetAddress;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.EnumSet;
|
import java.util.EnumSet;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
@ -41,6 +42,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||||
import org.apache.hadoop.yarn.api.records.Priority;
|
import org.apache.hadoop.yarn.api.records.Priority;
|
||||||
|
@ -81,6 +83,8 @@ public class UnmanagedAMLauncher {
|
||||||
// set the classpath explicitly
|
// set the classpath explicitly
|
||||||
private String classpath = null;
|
private String classpath = null;
|
||||||
|
|
||||||
|
private volatile boolean amCompleted = false;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param args
|
* @param args
|
||||||
* Command line arguments
|
* Command line arguments
|
||||||
|
@ -180,7 +184,17 @@ public class UnmanagedAMLauncher {
|
||||||
envAMList.add("CLASSPATH="+classpath);
|
envAMList.add("CLASSPATH="+classpath);
|
||||||
}
|
}
|
||||||
|
|
||||||
envAMList.add(ApplicationConstants.AM_APP_ATTEMPT_ID_ENV + "=" + attemptId);
|
ContainerId containerId = Records.newRecord(ContainerId.class);
|
||||||
|
containerId.setApplicationAttemptId(attemptId);
|
||||||
|
containerId.setId(0);
|
||||||
|
|
||||||
|
String hostname = InetAddress.getLocalHost().getHostName();
|
||||||
|
envAMList.add(ApplicationConstants.AM_CONTAINER_ID_ENV + "=" + containerId);
|
||||||
|
envAMList.add(ApplicationConstants.NM_HOST_ENV + "=" + hostname);
|
||||||
|
envAMList.add(ApplicationConstants.NM_HTTP_PORT_ENV + "=0");
|
||||||
|
envAMList.add(ApplicationConstants.NM_PORT_ENV + "=0");
|
||||||
|
envAMList.add(ApplicationConstants.APP_SUBMIT_TIME_ENV + "="
|
||||||
|
+ System.currentTimeMillis());
|
||||||
|
|
||||||
String[] envAM = new String[envAMList.size()];
|
String[] envAM = new String[envAMList.size()];
|
||||||
Process amProc = Runtime.getRuntime().exec(amCmd, envAMList.toArray(envAM));
|
Process amProc = Runtime.getRuntime().exec(amCmd, envAMList.toArray(envAM));
|
||||||
|
@ -233,6 +247,8 @@ public class UnmanagedAMLauncher {
|
||||||
LOG.info("AM process exited with value: " + exitCode);
|
LOG.info("AM process exited with value: " + exitCode);
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
e.printStackTrace();
|
e.printStackTrace();
|
||||||
|
} finally {
|
||||||
|
amCompleted = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
@ -306,6 +322,7 @@ public class UnmanagedAMLauncher {
|
||||||
appReport = monitorApplication(appId, EnumSet.of(
|
appReport = monitorApplication(appId, EnumSet.of(
|
||||||
YarnApplicationState.KILLED, YarnApplicationState.FAILED,
|
YarnApplicationState.KILLED, YarnApplicationState.FAILED,
|
||||||
YarnApplicationState.FINISHED));
|
YarnApplicationState.FINISHED));
|
||||||
|
|
||||||
YarnApplicationState appState = appReport.getYarnApplicationState();
|
YarnApplicationState appState = appReport.getYarnApplicationState();
|
||||||
FinalApplicationStatus appStatus = appReport.getFinalApplicationStatus();
|
FinalApplicationStatus appStatus = appReport.getFinalApplicationStatus();
|
||||||
|
|
||||||
|
@ -341,6 +358,19 @@ public class UnmanagedAMLauncher {
|
||||||
private ApplicationReport monitorApplication(ApplicationId appId,
|
private ApplicationReport monitorApplication(ApplicationId appId,
|
||||||
Set<YarnApplicationState> finalState) throws YarnRemoteException {
|
Set<YarnApplicationState> finalState) throws YarnRemoteException {
|
||||||
|
|
||||||
|
long foundAMCompletedTime = 0;
|
||||||
|
final int timeToWaitMS = 10000;
|
||||||
|
StringBuilder expectedFinalState = new StringBuilder();
|
||||||
|
boolean first = true;
|
||||||
|
for (YarnApplicationState state : finalState) {
|
||||||
|
if (first) {
|
||||||
|
first = false;
|
||||||
|
expectedFinalState.append(state.name());
|
||||||
|
} else {
|
||||||
|
expectedFinalState.append("," + state.name());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
while (true) {
|
while (true) {
|
||||||
|
|
||||||
// Check app status every 1 second.
|
// Check app status every 1 second.
|
||||||
|
@ -370,8 +400,24 @@ public class UnmanagedAMLauncher {
|
||||||
return report;
|
return report;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// wait for 10 seconds after process has completed for app report to
|
||||||
|
// come back
|
||||||
|
if (amCompleted) {
|
||||||
|
if (foundAMCompletedTime == 0) {
|
||||||
|
foundAMCompletedTime = System.currentTimeMillis();
|
||||||
|
} else if ((System.currentTimeMillis() - foundAMCompletedTime)
|
||||||
|
> timeToWaitMS) {
|
||||||
|
LOG.warn("Waited " + timeToWaitMS/1000
|
||||||
|
+ " seconds after process completed for AppReport"
|
||||||
|
+ " to reach desired final state. Not waiting anymore."
|
||||||
|
+ "CurrentState = " + state
|
||||||
|
+ ", ExpectedStates = " + expectedFinalState.toString());
|
||||||
|
throw new RuntimeException("Failed to receive final expected state"
|
||||||
|
+ " in ApplicationReport"
|
||||||
|
+ ", CurrentState=" + state
|
||||||
|
+ ", ExpectedStates=" + expectedFinalState.toString());
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,6 +18,8 @@
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.applications.unmanagedamlauncher;
|
package org.apache.hadoop.yarn.applications.unmanagedamlauncher;
|
||||||
|
|
||||||
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.FileOutputStream;
|
import java.io.FileOutputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
@ -125,4 +127,40 @@ public class TestUnmanagedAMLauncher {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout=30000)
|
||||||
|
public void testDSShellError() throws Exception {
|
||||||
|
String classpath = getTestRuntimeClasspath();
|
||||||
|
String javaHome = System.getenv("JAVA_HOME");
|
||||||
|
if (javaHome == null) {
|
||||||
|
LOG.fatal("JAVA_HOME not defined. Test not running.");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// remove shell command to make dist-shell fail in initialization itself
|
||||||
|
String[] args = {
|
||||||
|
"--classpath",
|
||||||
|
classpath,
|
||||||
|
"--queue",
|
||||||
|
"default",
|
||||||
|
"--cmd",
|
||||||
|
javaHome
|
||||||
|
+ "/bin/java -Xmx512m "
|
||||||
|
+ "org.apache.hadoop.yarn.applications.distributedshell.ApplicationMaster "
|
||||||
|
+ "--container_memory 128 --num_containers 1 --priority 0" };
|
||||||
|
|
||||||
|
LOG.info("Initializing Launcher");
|
||||||
|
UnmanagedAMLauncher launcher = new UnmanagedAMLauncher(new Configuration(
|
||||||
|
yarnCluster.getConfig()));
|
||||||
|
boolean initSuccess = launcher.init(args);
|
||||||
|
Assert.assertTrue(initSuccess);
|
||||||
|
LOG.info("Running Launcher");
|
||||||
|
|
||||||
|
try {
|
||||||
|
launcher.run();
|
||||||
|
fail("Expected an exception to occur as launch should have failed");
|
||||||
|
} catch (RuntimeException e) {
|
||||||
|
// Expected
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue