diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index eb0978c8efc..28160be830f 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -59,6 +59,9 @@ Release 2.6.0 - UNRELEASED YARN-2013. The diagnostics is always the ExitCodeException stack when the container crashes. (Tsuyoshi OZAWA via junping_du) + YARN-2295. Refactored DistributedShell to use public APIs of protocol records. + (Li Lu via jianhe) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java index 6722307d696..5e1cbbcd932 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java @@ -83,6 +83,7 @@ import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.URL; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent; import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; @@ -94,7 +95,6 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.util.ConverterUtils; -import org.apache.hadoop.yarn.util.Records; import org.apache.log4j.LogManager; import com.google.common.annotations.VisibleForTesting; @@ -522,6 +522,8 @@ public class ApplicationMaster { + appAttemptID.toString(), e); } + // Note: Credentials, Token, UserGroupInformation, DataOutputBuffer class + // are marked as LimitedPrivate Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials(); DataOutputBuffer dob = new DataOutputBuffer(); @@ -900,11 +902,6 @@ public class ApplicationMaster { public void run() { LOG.info("Setting up container launch container for containerid=" + container.getId()); - ContainerLaunchContext ctx = Records - .newRecord(ContainerLaunchContext.class); - - // Set the environment - ctx.setEnvironment(shellEnv); // Set the local resources Map localResources = new HashMap(); @@ -935,16 +932,13 @@ public class ApplicationMaster { return; } - LocalResource shellRsrc = Records.newRecord(LocalResource.class); - shellRsrc.setType(LocalResourceType.FILE); - shellRsrc.setVisibility(LocalResourceVisibility.APPLICATION); + URL yarnUrl = null; try { - shellRsrc.setResource(ConverterUtils.getYarnUrlFromURI(new URI( - renamedScriptPath.toString()))); + yarnUrl = ConverterUtils.getYarnUrlFromURI( + new URI(renamedScriptPath.toString())); } catch (URISyntaxException e) { LOG.error("Error when trying to use shell script path specified" + " in env, path=" + renamedScriptPath, e); - // A failure scenario on bad input such as invalid shell script path // We know we cannot continue launching the container // so we should release it. @@ -953,13 +947,13 @@ public class ApplicationMaster { numFailedContainers.incrementAndGet(); return; } - shellRsrc.setTimestamp(shellScriptPathTimestamp); - shellRsrc.setSize(shellScriptPathLen); + LocalResource shellRsrc = LocalResource.newInstance(yarnUrl, + LocalResourceType.FILE, LocalResourceVisibility.APPLICATION, + shellScriptPathLen, shellScriptPathTimestamp); localResources.put(Shell.WINDOWS ? ExecBatScripStringtPath : ExecShellStringPath, shellRsrc); shellCommand = Shell.WINDOWS ? windows_command : linux_bash_command; } - ctx.setLocalResources(localResources); // Set the necessary command to execute on the allocated container Vector vargs = new Vector(5); @@ -986,16 +980,18 @@ public class ApplicationMaster { List commands = new ArrayList(); commands.add(command.toString()); - ctx.setCommands(commands); - // Set up tokens for the container too. Today, for normal shell commands, - // the container in distribute-shell doesn't need any tokens. We are - // populating them mainly for NodeManagers to be able to download any - // files in the distributed file-system. The tokens are otherwise also - // useful in cases, for e.g., when one is running a "hadoop dfs" command - // inside the distributed shell. - ctx.setTokens(allTokens.duplicate()); + // Set up ContainerLaunchContext, setting local resource, environment, + // command and token for constructor. + // Note for tokens: Set up tokens for the container too. Today, for normal + // shell commands, the container in distribute-shell doesn't need any + // tokens. We are populating them mainly for NodeManagers to be able to + // download anyfiles in the distributed file-system. The tokens are + // otherwise also useful in cases, for e.g., when one is running a + // "hadoop dfs" command inside the distributed shell. + ContainerLaunchContext ctx = ContainerLaunchContext.newInstance( + localResources, shellEnv, commands, null, allTokens.duplicate(), null); containerListener.addContainer(container.getId(), container); nmClientAsync.startContainerAsync(container, ctx); } @@ -1024,15 +1020,13 @@ public class ApplicationMaster { // setup requirements for hosts // using * as any host will do for the distributed shell app // set the priority for the request - Priority pri = Records.newRecord(Priority.class); // TODO - what is the range for priority? how to decide? - pri.setPriority(requestPriority); + Priority pri = Priority.newInstance(requestPriority); // Set up resource type requirements // For now, memory and CPU are supported so we set memory and cpu requirements - Resource capability = Records.newRecord(Resource.class); - capability.setMemory(containerMemory); - capability.setVirtualCores(containerVirtualCores); + Resource capability = Resource.newInstance(containerMemory, + containerVirtualCores); ContainerRequest request = new ContainerRequest(capability, null, null, pri); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java index 3336ed97281..05fd883be93 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java @@ -456,9 +456,6 @@ public class Client { appContext.setKeepContainersAcrossApplicationAttempts(keepContainers); appContext.setApplicationName(appName); - // Set up the container launch context for the application master - ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class); - // set local resources for the application master // local files or archives as needed // In this scenario, the jar file for the application master is part of the local resources @@ -508,8 +505,6 @@ public class Client { addToLocalResources(fs, null, shellArgsPath, appId.toString(), localResources, StringUtils.join(shellArgs, " ")); } - // Set local resource info into app master container launch context - amContainer.setLocalResources(localResources); // Set the necessary security tokens as needed //amContainer.setContainerTokens(containerToken); @@ -550,8 +545,6 @@ public class Client { env.put("CLASSPATH", classPathEnv.toString()); - amContainer.setEnvironment(env); - // Set the necessary command to execute the application master Vector vargs = new Vector(30); @@ -587,14 +580,15 @@ public class Client { LOG.info("Completed setting up app master command " + command.toString()); List commands = new ArrayList(); commands.add(command.toString()); - amContainer.setCommands(commands); + + // Set up the container launch context for the application master + ContainerLaunchContext amContainer = ContainerLaunchContext.newInstance( + localResources, env, commands, null, null, null); // Set up resource type requirements // For now, both memory and vcores are supported, so we set memory and // vcores requirements - Resource capability = Records.newRecord(Resource.class); - capability.setMemory(amMemory); - capability.setVirtualCores(amVCores); + Resource capability = Resource.newInstance(amMemory, amVCores); appContext.setResource(capability); // Service data is a binary blob that can be passed to the application @@ -603,6 +597,7 @@ public class Client { // Setup security tokens if (UserGroupInformation.isSecurityEnabled()) { + // Note: Credentials class is marked as LimitedPrivate for HDFS and MapReduce Credentials credentials = new Credentials(); String tokenRenewer = conf.get(YarnConfiguration.RM_PRINCIPAL); if (tokenRenewer == null || tokenRenewer.length() == 0) { @@ -627,9 +622,8 @@ public class Client { appContext.setAMContainerSpec(amContainer); // Set the priority for the application master - Priority pri = Records.newRecord(Priority.class); // TODO - what is the range for priority? how to decide? - pri.setPriority(amPriority); + Priority pri = Priority.newInstance(amPriority); appContext.setPriority(pri); // Set the queue to which this application is to be submitted in the RM