YARN-2295. Refactored DistributedShell to use public APIs of protocol records. Contributed by Li Lu
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1612626 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
c83c5b868e
commit
f6c723ff0c
|
@ -59,6 +59,9 @@ Release 2.6.0 - UNRELEASED
|
||||||
YARN-2013. The diagnostics is always the ExitCodeException stack when the container
|
YARN-2013. The diagnostics is always the ExitCodeException stack when the container
|
||||||
crashes. (Tsuyoshi OZAWA via junping_du)
|
crashes. (Tsuyoshi OZAWA via junping_du)
|
||||||
|
|
||||||
|
YARN-2295. Refactored DistributedShell to use public APIs of protocol records.
|
||||||
|
(Li Lu via jianhe)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
BUG FIXES
|
BUG FIXES
|
||||||
|
|
|
@ -83,6 +83,7 @@ import org.apache.hadoop.yarn.api.records.NodeReport;
|
||||||
import org.apache.hadoop.yarn.api.records.Priority;
|
import org.apache.hadoop.yarn.api.records.Priority;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.records.URL;
|
||||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
|
||||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
|
||||||
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
|
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
|
||||||
|
@ -94,7 +95,6 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
||||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||||
import org.apache.hadoop.yarn.util.Records;
|
|
||||||
import org.apache.log4j.LogManager;
|
import org.apache.log4j.LogManager;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
@ -522,6 +522,8 @@ public class ApplicationMaster {
|
||||||
+ appAttemptID.toString(), e);
|
+ appAttemptID.toString(), e);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Note: Credentials, Token, UserGroupInformation, DataOutputBuffer class
|
||||||
|
// are marked as LimitedPrivate
|
||||||
Credentials credentials =
|
Credentials credentials =
|
||||||
UserGroupInformation.getCurrentUser().getCredentials();
|
UserGroupInformation.getCurrentUser().getCredentials();
|
||||||
DataOutputBuffer dob = new DataOutputBuffer();
|
DataOutputBuffer dob = new DataOutputBuffer();
|
||||||
|
@ -900,11 +902,6 @@ public class ApplicationMaster {
|
||||||
public void run() {
|
public void run() {
|
||||||
LOG.info("Setting up container launch container for containerid="
|
LOG.info("Setting up container launch container for containerid="
|
||||||
+ container.getId());
|
+ container.getId());
|
||||||
ContainerLaunchContext ctx = Records
|
|
||||||
.newRecord(ContainerLaunchContext.class);
|
|
||||||
|
|
||||||
// Set the environment
|
|
||||||
ctx.setEnvironment(shellEnv);
|
|
||||||
|
|
||||||
// Set the local resources
|
// Set the local resources
|
||||||
Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
|
Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
|
||||||
|
@ -935,16 +932,13 @@ public class ApplicationMaster {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
LocalResource shellRsrc = Records.newRecord(LocalResource.class);
|
URL yarnUrl = null;
|
||||||
shellRsrc.setType(LocalResourceType.FILE);
|
|
||||||
shellRsrc.setVisibility(LocalResourceVisibility.APPLICATION);
|
|
||||||
try {
|
try {
|
||||||
shellRsrc.setResource(ConverterUtils.getYarnUrlFromURI(new URI(
|
yarnUrl = ConverterUtils.getYarnUrlFromURI(
|
||||||
renamedScriptPath.toString())));
|
new URI(renamedScriptPath.toString()));
|
||||||
} catch (URISyntaxException e) {
|
} catch (URISyntaxException e) {
|
||||||
LOG.error("Error when trying to use shell script path specified"
|
LOG.error("Error when trying to use shell script path specified"
|
||||||
+ " in env, path=" + renamedScriptPath, e);
|
+ " in env, path=" + renamedScriptPath, e);
|
||||||
|
|
||||||
// A failure scenario on bad input such as invalid shell script path
|
// A failure scenario on bad input such as invalid shell script path
|
||||||
// We know we cannot continue launching the container
|
// We know we cannot continue launching the container
|
||||||
// so we should release it.
|
// so we should release it.
|
||||||
|
@ -953,13 +947,13 @@ public class ApplicationMaster {
|
||||||
numFailedContainers.incrementAndGet();
|
numFailedContainers.incrementAndGet();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
shellRsrc.setTimestamp(shellScriptPathTimestamp);
|
LocalResource shellRsrc = LocalResource.newInstance(yarnUrl,
|
||||||
shellRsrc.setSize(shellScriptPathLen);
|
LocalResourceType.FILE, LocalResourceVisibility.APPLICATION,
|
||||||
|
shellScriptPathLen, shellScriptPathTimestamp);
|
||||||
localResources.put(Shell.WINDOWS ? ExecBatScripStringtPath :
|
localResources.put(Shell.WINDOWS ? ExecBatScripStringtPath :
|
||||||
ExecShellStringPath, shellRsrc);
|
ExecShellStringPath, shellRsrc);
|
||||||
shellCommand = Shell.WINDOWS ? windows_command : linux_bash_command;
|
shellCommand = Shell.WINDOWS ? windows_command : linux_bash_command;
|
||||||
}
|
}
|
||||||
ctx.setLocalResources(localResources);
|
|
||||||
|
|
||||||
// Set the necessary command to execute on the allocated container
|
// Set the necessary command to execute on the allocated container
|
||||||
Vector<CharSequence> vargs = new Vector<CharSequence>(5);
|
Vector<CharSequence> vargs = new Vector<CharSequence>(5);
|
||||||
|
@ -986,16 +980,18 @@ public class ApplicationMaster {
|
||||||
|
|
||||||
List<String> commands = new ArrayList<String>();
|
List<String> commands = new ArrayList<String>();
|
||||||
commands.add(command.toString());
|
commands.add(command.toString());
|
||||||
ctx.setCommands(commands);
|
|
||||||
|
|
||||||
// Set up tokens for the container too. Today, for normal shell commands,
|
// Set up ContainerLaunchContext, setting local resource, environment,
|
||||||
// the container in distribute-shell doesn't need any tokens. We are
|
// command and token for constructor.
|
||||||
// populating them mainly for NodeManagers to be able to download any
|
|
||||||
// files in the distributed file-system. The tokens are otherwise also
|
|
||||||
// useful in cases, for e.g., when one is running a "hadoop dfs" command
|
|
||||||
// inside the distributed shell.
|
|
||||||
ctx.setTokens(allTokens.duplicate());
|
|
||||||
|
|
||||||
|
// Note for tokens: Set up tokens for the container too. Today, for normal
|
||||||
|
// shell commands, the container in distribute-shell doesn't need any
|
||||||
|
// tokens. We are populating them mainly for NodeManagers to be able to
|
||||||
|
// download anyfiles in the distributed file-system. The tokens are
|
||||||
|
// otherwise also useful in cases, for e.g., when one is running a
|
||||||
|
// "hadoop dfs" command inside the distributed shell.
|
||||||
|
ContainerLaunchContext ctx = ContainerLaunchContext.newInstance(
|
||||||
|
localResources, shellEnv, commands, null, allTokens.duplicate(), null);
|
||||||
containerListener.addContainer(container.getId(), container);
|
containerListener.addContainer(container.getId(), container);
|
||||||
nmClientAsync.startContainerAsync(container, ctx);
|
nmClientAsync.startContainerAsync(container, ctx);
|
||||||
}
|
}
|
||||||
|
@ -1024,15 +1020,13 @@ public class ApplicationMaster {
|
||||||
// setup requirements for hosts
|
// setup requirements for hosts
|
||||||
// using * as any host will do for the distributed shell app
|
// using * as any host will do for the distributed shell app
|
||||||
// set the priority for the request
|
// set the priority for the request
|
||||||
Priority pri = Records.newRecord(Priority.class);
|
|
||||||
// TODO - what is the range for priority? how to decide?
|
// TODO - what is the range for priority? how to decide?
|
||||||
pri.setPriority(requestPriority);
|
Priority pri = Priority.newInstance(requestPriority);
|
||||||
|
|
||||||
// Set up resource type requirements
|
// Set up resource type requirements
|
||||||
// For now, memory and CPU are supported so we set memory and cpu requirements
|
// For now, memory and CPU are supported so we set memory and cpu requirements
|
||||||
Resource capability = Records.newRecord(Resource.class);
|
Resource capability = Resource.newInstance(containerMemory,
|
||||||
capability.setMemory(containerMemory);
|
containerVirtualCores);
|
||||||
capability.setVirtualCores(containerVirtualCores);
|
|
||||||
|
|
||||||
ContainerRequest request = new ContainerRequest(capability, null, null,
|
ContainerRequest request = new ContainerRequest(capability, null, null,
|
||||||
pri);
|
pri);
|
||||||
|
|
|
@ -456,9 +456,6 @@ public class Client {
|
||||||
appContext.setKeepContainersAcrossApplicationAttempts(keepContainers);
|
appContext.setKeepContainersAcrossApplicationAttempts(keepContainers);
|
||||||
appContext.setApplicationName(appName);
|
appContext.setApplicationName(appName);
|
||||||
|
|
||||||
// Set up the container launch context for the application master
|
|
||||||
ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class);
|
|
||||||
|
|
||||||
// set local resources for the application master
|
// set local resources for the application master
|
||||||
// local files or archives as needed
|
// local files or archives as needed
|
||||||
// In this scenario, the jar file for the application master is part of the local resources
|
// In this scenario, the jar file for the application master is part of the local resources
|
||||||
|
@ -508,8 +505,6 @@ public class Client {
|
||||||
addToLocalResources(fs, null, shellArgsPath, appId.toString(),
|
addToLocalResources(fs, null, shellArgsPath, appId.toString(),
|
||||||
localResources, StringUtils.join(shellArgs, " "));
|
localResources, StringUtils.join(shellArgs, " "));
|
||||||
}
|
}
|
||||||
// Set local resource info into app master container launch context
|
|
||||||
amContainer.setLocalResources(localResources);
|
|
||||||
|
|
||||||
// Set the necessary security tokens as needed
|
// Set the necessary security tokens as needed
|
||||||
//amContainer.setContainerTokens(containerToken);
|
//amContainer.setContainerTokens(containerToken);
|
||||||
|
@ -550,8 +545,6 @@ public class Client {
|
||||||
|
|
||||||
env.put("CLASSPATH", classPathEnv.toString());
|
env.put("CLASSPATH", classPathEnv.toString());
|
||||||
|
|
||||||
amContainer.setEnvironment(env);
|
|
||||||
|
|
||||||
// Set the necessary command to execute the application master
|
// Set the necessary command to execute the application master
|
||||||
Vector<CharSequence> vargs = new Vector<CharSequence>(30);
|
Vector<CharSequence> vargs = new Vector<CharSequence>(30);
|
||||||
|
|
||||||
|
@ -587,14 +580,15 @@ public class Client {
|
||||||
LOG.info("Completed setting up app master command " + command.toString());
|
LOG.info("Completed setting up app master command " + command.toString());
|
||||||
List<String> commands = new ArrayList<String>();
|
List<String> commands = new ArrayList<String>();
|
||||||
commands.add(command.toString());
|
commands.add(command.toString());
|
||||||
amContainer.setCommands(commands);
|
|
||||||
|
// Set up the container launch context for the application master
|
||||||
|
ContainerLaunchContext amContainer = ContainerLaunchContext.newInstance(
|
||||||
|
localResources, env, commands, null, null, null);
|
||||||
|
|
||||||
// Set up resource type requirements
|
// Set up resource type requirements
|
||||||
// For now, both memory and vcores are supported, so we set memory and
|
// For now, both memory and vcores are supported, so we set memory and
|
||||||
// vcores requirements
|
// vcores requirements
|
||||||
Resource capability = Records.newRecord(Resource.class);
|
Resource capability = Resource.newInstance(amMemory, amVCores);
|
||||||
capability.setMemory(amMemory);
|
|
||||||
capability.setVirtualCores(amVCores);
|
|
||||||
appContext.setResource(capability);
|
appContext.setResource(capability);
|
||||||
|
|
||||||
// Service data is a binary blob that can be passed to the application
|
// Service data is a binary blob that can be passed to the application
|
||||||
|
@ -603,6 +597,7 @@ public class Client {
|
||||||
|
|
||||||
// Setup security tokens
|
// Setup security tokens
|
||||||
if (UserGroupInformation.isSecurityEnabled()) {
|
if (UserGroupInformation.isSecurityEnabled()) {
|
||||||
|
// Note: Credentials class is marked as LimitedPrivate for HDFS and MapReduce
|
||||||
Credentials credentials = new Credentials();
|
Credentials credentials = new Credentials();
|
||||||
String tokenRenewer = conf.get(YarnConfiguration.RM_PRINCIPAL);
|
String tokenRenewer = conf.get(YarnConfiguration.RM_PRINCIPAL);
|
||||||
if (tokenRenewer == null || tokenRenewer.length() == 0) {
|
if (tokenRenewer == null || tokenRenewer.length() == 0) {
|
||||||
|
@ -627,9 +622,8 @@ public class Client {
|
||||||
appContext.setAMContainerSpec(amContainer);
|
appContext.setAMContainerSpec(amContainer);
|
||||||
|
|
||||||
// Set the priority for the application master
|
// Set the priority for the application master
|
||||||
Priority pri = Records.newRecord(Priority.class);
|
|
||||||
// TODO - what is the range for priority? how to decide?
|
// TODO - what is the range for priority? how to decide?
|
||||||
pri.setPriority(amPriority);
|
Priority pri = Priority.newInstance(amPriority);
|
||||||
appContext.setPriority(pri);
|
appContext.setPriority(pri);
|
||||||
|
|
||||||
// Set the queue to which this application is to be submitted in the RM
|
// Set the queue to which this application is to be submitted in the RM
|
||||||
|
|
Loading…
Reference in New Issue