Merge r1612626 from trunk. YARN-2295. Refactored DistributedShell to use public APIs of protocol records. Contributed by Li Lu

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1612629 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jian He 2014-07-22 17:50:44 +00:00
parent c4537871dc
commit 82cd20f485
3 changed files with 32 additions and 41 deletions

View File

@ -41,6 +41,9 @@ Release 2.6.0 - UNRELEASED
YARN-2013. The diagnostics is always the ExitCodeException stack when the container YARN-2013. The diagnostics is always the ExitCodeException stack when the container
crashes. (Tsuyoshi OZAWA via junping_du) crashes. (Tsuyoshi OZAWA via junping_du)
YARN-2295. Refactored DistributedShell to use public APIs of protocol records.
(Li Lu via jianhe)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -83,6 +83,7 @@
import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent; import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
@ -94,7 +95,6 @@
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Records;
import org.apache.log4j.LogManager; import org.apache.log4j.LogManager;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
@ -522,6 +522,8 @@ public void run() throws YarnException, IOException {
+ appAttemptID.toString(), e); + appAttemptID.toString(), e);
} }
// Note: Credentials, Token, UserGroupInformation, DataOutputBuffer class
// are marked as LimitedPrivate
Credentials credentials = Credentials credentials =
UserGroupInformation.getCurrentUser().getCredentials(); UserGroupInformation.getCurrentUser().getCredentials();
DataOutputBuffer dob = new DataOutputBuffer(); DataOutputBuffer dob = new DataOutputBuffer();
@ -900,11 +902,6 @@ public LaunchContainerRunnable(
public void run() { public void run() {
LOG.info("Setting up container launch container for containerid=" LOG.info("Setting up container launch container for containerid="
+ container.getId()); + container.getId());
ContainerLaunchContext ctx = Records
.newRecord(ContainerLaunchContext.class);
// Set the environment
ctx.setEnvironment(shellEnv);
// Set the local resources // Set the local resources
Map<String, LocalResource> localResources = new HashMap<String, LocalResource>(); Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
@ -935,16 +932,13 @@ public void run() {
return; return;
} }
LocalResource shellRsrc = Records.newRecord(LocalResource.class); URL yarnUrl = null;
shellRsrc.setType(LocalResourceType.FILE);
shellRsrc.setVisibility(LocalResourceVisibility.APPLICATION);
try { try {
shellRsrc.setResource(ConverterUtils.getYarnUrlFromURI(new URI( yarnUrl = ConverterUtils.getYarnUrlFromURI(
renamedScriptPath.toString()))); new URI(renamedScriptPath.toString()));
} catch (URISyntaxException e) { } catch (URISyntaxException e) {
LOG.error("Error when trying to use shell script path specified" LOG.error("Error when trying to use shell script path specified"
+ " in env, path=" + renamedScriptPath, e); + " in env, path=" + renamedScriptPath, e);
// A failure scenario on bad input such as invalid shell script path // A failure scenario on bad input such as invalid shell script path
// We know we cannot continue launching the container // We know we cannot continue launching the container
// so we should release it. // so we should release it.
@ -953,13 +947,13 @@ public void run() {
numFailedContainers.incrementAndGet(); numFailedContainers.incrementAndGet();
return; return;
} }
shellRsrc.setTimestamp(shellScriptPathTimestamp); LocalResource shellRsrc = LocalResource.newInstance(yarnUrl,
shellRsrc.setSize(shellScriptPathLen); LocalResourceType.FILE, LocalResourceVisibility.APPLICATION,
shellScriptPathLen, shellScriptPathTimestamp);
localResources.put(Shell.WINDOWS ? ExecBatScripStringtPath : localResources.put(Shell.WINDOWS ? ExecBatScripStringtPath :
ExecShellStringPath, shellRsrc); ExecShellStringPath, shellRsrc);
shellCommand = Shell.WINDOWS ? windows_command : linux_bash_command; shellCommand = Shell.WINDOWS ? windows_command : linux_bash_command;
} }
ctx.setLocalResources(localResources);
// Set the necessary command to execute on the allocated container // Set the necessary command to execute on the allocated container
Vector<CharSequence> vargs = new Vector<CharSequence>(5); Vector<CharSequence> vargs = new Vector<CharSequence>(5);
@ -986,16 +980,18 @@ public void run() {
List<String> commands = new ArrayList<String>(); List<String> commands = new ArrayList<String>();
commands.add(command.toString()); commands.add(command.toString());
ctx.setCommands(commands);
// Set up tokens for the container too. Today, for normal shell commands, // Set up ContainerLaunchContext, setting local resource, environment,
// the container in distribute-shell doesn't need any tokens. We are // command and token for constructor.
// populating them mainly for NodeManagers to be able to download any
// files in the distributed file-system. The tokens are otherwise also
// useful in cases, for e.g., when one is running a "hadoop dfs" command
// inside the distributed shell.
ctx.setTokens(allTokens.duplicate());
// Note for tokens: Set up tokens for the container too. Today, for normal
// shell commands, the container in distribute-shell doesn't need any
// tokens. We are populating them mainly for NodeManagers to be able to
// download anyfiles in the distributed file-system. The tokens are
// otherwise also useful in cases, for e.g., when one is running a
// "hadoop dfs" command inside the distributed shell.
ContainerLaunchContext ctx = ContainerLaunchContext.newInstance(
localResources, shellEnv, commands, null, allTokens.duplicate(), null);
containerListener.addContainer(container.getId(), container); containerListener.addContainer(container.getId(), container);
nmClientAsync.startContainerAsync(container, ctx); nmClientAsync.startContainerAsync(container, ctx);
} }
@ -1024,15 +1020,13 @@ private ContainerRequest setupContainerAskForRM() {
// setup requirements for hosts // setup requirements for hosts
// using * as any host will do for the distributed shell app // using * as any host will do for the distributed shell app
// set the priority for the request // set the priority for the request
Priority pri = Records.newRecord(Priority.class);
// TODO - what is the range for priority? how to decide? // TODO - what is the range for priority? how to decide?
pri.setPriority(requestPriority); Priority pri = Priority.newInstance(requestPriority);
// Set up resource type requirements // Set up resource type requirements
// For now, memory and CPU are supported so we set memory and cpu requirements // For now, memory and CPU are supported so we set memory and cpu requirements
Resource capability = Records.newRecord(Resource.class); Resource capability = Resource.newInstance(containerMemory,
capability.setMemory(containerMemory); containerVirtualCores);
capability.setVirtualCores(containerVirtualCores);
ContainerRequest request = new ContainerRequest(capability, null, null, ContainerRequest request = new ContainerRequest(capability, null, null,
pri); pri);

View File

@ -456,9 +456,6 @@ public boolean run() throws IOException, YarnException {
appContext.setKeepContainersAcrossApplicationAttempts(keepContainers); appContext.setKeepContainersAcrossApplicationAttempts(keepContainers);
appContext.setApplicationName(appName); appContext.setApplicationName(appName);
// Set up the container launch context for the application master
ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class);
// set local resources for the application master // set local resources for the application master
// local files or archives as needed // local files or archives as needed
// In this scenario, the jar file for the application master is part of the local resources // In this scenario, the jar file for the application master is part of the local resources
@ -508,8 +505,6 @@ public boolean run() throws IOException, YarnException {
addToLocalResources(fs, null, shellArgsPath, appId.toString(), addToLocalResources(fs, null, shellArgsPath, appId.toString(),
localResources, StringUtils.join(shellArgs, " ")); localResources, StringUtils.join(shellArgs, " "));
} }
// Set local resource info into app master container launch context
amContainer.setLocalResources(localResources);
// Set the necessary security tokens as needed // Set the necessary security tokens as needed
//amContainer.setContainerTokens(containerToken); //amContainer.setContainerTokens(containerToken);
@ -550,8 +545,6 @@ public boolean run() throws IOException, YarnException {
env.put("CLASSPATH", classPathEnv.toString()); env.put("CLASSPATH", classPathEnv.toString());
amContainer.setEnvironment(env);
// Set the necessary command to execute the application master // Set the necessary command to execute the application master
Vector<CharSequence> vargs = new Vector<CharSequence>(30); Vector<CharSequence> vargs = new Vector<CharSequence>(30);
@ -587,14 +580,15 @@ public boolean run() throws IOException, YarnException {
LOG.info("Completed setting up app master command " + command.toString()); LOG.info("Completed setting up app master command " + command.toString());
List<String> commands = new ArrayList<String>(); List<String> commands = new ArrayList<String>();
commands.add(command.toString()); commands.add(command.toString());
amContainer.setCommands(commands);
// Set up the container launch context for the application master
ContainerLaunchContext amContainer = ContainerLaunchContext.newInstance(
localResources, env, commands, null, null, null);
// Set up resource type requirements // Set up resource type requirements
// For now, both memory and vcores are supported, so we set memory and // For now, both memory and vcores are supported, so we set memory and
// vcores requirements // vcores requirements
Resource capability = Records.newRecord(Resource.class); Resource capability = Resource.newInstance(amMemory, amVCores);
capability.setMemory(amMemory);
capability.setVirtualCores(amVCores);
appContext.setResource(capability); appContext.setResource(capability);
// Service data is a binary blob that can be passed to the application // Service data is a binary blob that can be passed to the application
@ -603,6 +597,7 @@ public boolean run() throws IOException, YarnException {
// Setup security tokens // Setup security tokens
if (UserGroupInformation.isSecurityEnabled()) { if (UserGroupInformation.isSecurityEnabled()) {
// Note: Credentials class is marked as LimitedPrivate for HDFS and MapReduce
Credentials credentials = new Credentials(); Credentials credentials = new Credentials();
String tokenRenewer = conf.get(YarnConfiguration.RM_PRINCIPAL); String tokenRenewer = conf.get(YarnConfiguration.RM_PRINCIPAL);
if (tokenRenewer == null || tokenRenewer.length() == 0) { if (tokenRenewer == null || tokenRenewer.length() == 0) {
@ -627,9 +622,8 @@ public boolean run() throws IOException, YarnException {
appContext.setAMContainerSpec(amContainer); appContext.setAMContainerSpec(amContainer);
// Set the priority for the application master // Set the priority for the application master
Priority pri = Records.newRecord(Priority.class);
// TODO - what is the range for priority? how to decide? // TODO - what is the range for priority? how to decide?
pri.setPriority(amPriority); Priority pri = Priority.newInstance(amPriority);
appContext.setPriority(pri); appContext.setPriority(pri);
// Set the queue to which this application is to be submitted in the RM // Set the queue to which this application is to be submitted in the RM