From b2f0fd896117c5703cc7e210f0bc1f629a74a6c4 Mon Sep 17 00:00:00 2001 From: Zhijie Shen Date: Tue, 12 Aug 2014 21:39:17 +0000 Subject: [PATCH] YARN-2317. Updated the document about how to write YARN applications. Contributed by Li Lu. svn merge --ignore-ancestry -c 1617594 ../../trunk/ git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1617597 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-yarn-project/CHANGES.txt | 3 + .../site/apt/WritingYarnApplications.apt.vm | 1317 ++++++++--------- 2 files changed, 644 insertions(+), 676 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index ccac8fa7097..c1b4118ab51 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -101,6 +101,9 @@ Release 2.6.0 - UNRELEASED YARN-2373. Changed WebAppUtils to use Configuration#getPassword for accessing SSL passwords. (Larry McCay via jianhe) + YARN-2317. Updated the document about how to write YARN applications. (Li Lu via + zjshen) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/WritingYarnApplications.apt.vm b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/WritingYarnApplications.apt.vm index 12e4c3ff334..57a47fda59d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/WritingYarnApplications.apt.vm +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/WritingYarnApplications.apt.vm @@ -11,8 +11,8 @@ ~~ limitations under the License. See accompanying LICENSE file. --- - Hadoop Map Reduce Next Generation-${project.version} - Writing YARN - Applications + Hadoop Map Reduce Next Generation-${project.version} - Writing YARN + Applications --- --- ${maven.build.timestamp} @@ -21,772 +21,737 @@ Hadoop MapReduce Next Generation - Writing YARN Applications %{toc|section=1|fromDepth=0} -* Purpose +* Purpose - This document describes, at a high-level, the way to implement new + This document describes, at a high-level, the way to implement new Applications for YARN. * Concepts and Flow - The general concept is that an 'Application Submission Client' submits an - 'Application' to the YARN Resource Manager. The client communicates with the - ResourceManager using the 'ApplicationClientProtocol' to first acquire a new - 'ApplicationId' if needed via ApplicationClientProtocol#getNewApplication and then - submit the 'Application' to be run via ApplicationClientProtocol#submitApplication. As - part of the ApplicationClientProtocol#submitApplication call, the client needs to - provide sufficient information to the ResourceManager to 'launch' the - application's first container i.e. the ApplicationMaster. - You need to provide information such as the details about the local - files/jars that need to be available for your application to run, the actual - command that needs to be executed (with the necessary command line arguments), - any Unix environment settings (optional), etc. Effectively, you need to - describe the Unix process(es) that needs to be launched for your - ApplicationMaster. + The general concept is that an submits an + to the YARN (RM). This can be done through + setting up a <<>> object. After <<>> is started, the + client can then set up application context, prepare the very first container of + the application that contains the (AM), and then submit + the application. You need to provide information such as the details about the + local files/jars that need to be available for your application to run, the + actual command that needs to be executed (with the necessary command line + arguments), any OS environment settings (optional), etc. Effectively, you + need to describe the Unix process(es) that needs to be launched for your + ApplicationMaster. - The YARN ResourceManager will then launch the ApplicationMaster (as specified) - on an allocated container. The ApplicationMaster is then expected to - communicate with the ResourceManager using the 'ApplicationMasterProtocol'. Firstly, the - ApplicationMaster needs to register itself with the ResourceManager. To - complete the task assigned to it, the ApplicationMaster can then request for - and receive containers via ApplicationMasterProtocol#allocate. After a container is - allocated to it, the ApplicationMaster communicates with the NodeManager using - ContainerManager#startContainer to launch the container for its task. As part - of launching this container, the ApplicationMaster has to specify the - ContainerLaunchContext which, similar to the ApplicationSubmissionContext, - has the launch information such as command line specification, environment, - etc. Once the task is completed, the ApplicationMaster has to signal the - ResourceManager of its completion via the ApplicationMasterProtocol#finishApplicationMaster. + The YARN ResourceManager will then launch the ApplicationMaster (as + specified) on an allocated container. The ApplicationMaster communicates with + YARN cluster, and handles application execution. It performs operations in an + asynchronous fashion. During application launch time, the main tasks of the + ApplicationMaster are: a) communicating with the ResourceManager to negotiate + and allocate resources for future containers, and b) after container + allocation, communicating YARN s (NMs) to launch application + containers on them. Task a) can be performed asynchronously through an + <<>> object, with event handling methods specified in a + <<>> type of event handler. The event handler + needs to be set to the client explicitly. Task b) can be performed by launching + a runnable object that then launches containers when there are containers + allocated. As part of launching this container, the AM has to + specify the <<>> that has the launch information such as + command line specification, environment, etc. - Meanwhile, the client can monitor the application's status by querying the - ResourceManager or by directly querying the ApplicationMaster if it supports - such a service. If needed, it can also kill the application via - ApplicationClientProtocol#forceKillApplication. + During the execution of an application, the ApplicationMaster communicates + NodeManagers through <<>> object. All container events are + handled by <<>>, associated with + <<>>. A typical callback handler handles client start, stop, + status update and error. ApplicationMaster also reports execution progress to + ResourceManager by handling the <<>> method of + <<>>. + + Other than asynchronous clients, there are synchronous versions for certain + workflows (<<>> and <<>>). The asynchronous clients are + recommended because of (subjectively) simpler usages, and this article + will mainly cover the asynchronous clients. Please refer to <<>> + and <<>> for more information on synchronous clients. -* Interfaces +* Interfaces The interfaces you'd most like be concerned with are: - * ApplicationClientProtocol - Client\<--\>ResourceManager\ - The protocol for a client that wishes to communicate with the - ResourceManager to launch a new application (i.e. the ApplicationMaster), - check on the status of the application or kill the application. For example, - a job-client (a job launching program from the gateway) would use this - protocol. + * <>\<--\><>\ + By using <<>> objects. + + * <>\<--\><>\ + By using <<>> objects, handling events asynchronously by + <<>> + + * <>\<--\><>\ + Launch containers. Communicate with NodeManagers + by using <<>> objects, handling container events by + <<>> + + [] + + <> - * ApplicationMasterProtocol - ApplicationMaster\<--\>ResourceManager\ - The protocol used by the ApplicationMaster to register/unregister itself - to/from the ResourceManager as well as to request for resources from the - Scheduler to complete its tasks. + * The three main protocols for YARN application (ApplicationClientProtocol, + ApplicationMasterProtocol and ContainerManagementProtocol) are still + preserved. The 3 clients wrap these 3 protocols to provide simpler + programming model for YARN applications. - * ContainerManager - ApplicationMaster\<--\>NodeManager\ - The protocol used by the ApplicationMaster to talk to the NodeManager to - start/stop containers and get status updates on the containers if needed. + * Under very rare circumstances, programmer may want to directly use the 3 + protocols to implement an application. However, note that . + + [] * Writing a Simple Yarn Application ** Writing a simple Client - * The first step that a client needs to do is to connect to the - ResourceManager or to be more specific, the ApplicationsManager (AsM) - interface of the ResourceManager. + * The first step that a client needs to do is to initialize and start a + YarnClient. +---+ - ApplicationClientProtocol applicationsManager; - YarnConfiguration yarnConf = new YarnConfiguration(conf); - InetSocketAddress rmAddress = - NetUtils.createSocketAddr(yarnConf.get( - YarnConfiguration.RM_ADDRESS, - YarnConfiguration.DEFAULT_RM_ADDRESS)); - LOG.info("Connecting to ResourceManager at " + rmAddress); - configuration appsManagerServerConf = new Configuration(conf); - appsManagerServerConf.setClass( - YarnConfiguration.YARN_SECURITY_INFO, - ClientRMSecurityInfo.class, SecurityInfo.class); - applicationsManager = ((ApplicationClientProtocol) rpc.getProxy( - ApplicationClientProtocol.class, rmAddress, appsManagerServerConf)); + YarnClient yarnClient = YarnClient.createYarnClient(); + yarnClient.init(conf); + yarnClient.start(); +---+ - * Once a handle is obtained to the ASM, the client needs to request the - ResourceManager for a new ApplicationId. + * Once a client is set up, the client needs to create an application, and get + its application id. +---+ - GetNewApplicationRequest request = - Records.newRecord(GetNewApplicationRequest.class); - GetNewApplicationResponse response = - applicationsManager.getNewApplication(request); - LOG.info("Got new ApplicationId=" + response.getApplicationId()); + YarnClientApplication app = yarnClient.createApplication(); + GetNewApplicationResponse appResponse = app.getNewApplicationResponse(); +---+ - * The response from the ASM for a new application also contains information - about the cluster such as the minimum/maximum resource capabilities of the - cluster. This is required so that to ensure that you can correctly set the - specifications of the container in which the ApplicationMaster would be - launched. Please refer to GetNewApplicationResponse for more details. + * The response from the <<>> for a new application also + contains information about the cluster such as the minimum/maximum resource + capabilities of the cluster. This is required so that to ensure that you can + correctly set the specifications of the container in which the + ApplicationMaster would be launched. Please refer to + <<>> for more details. - * The main crux of a client is to setup the ApplicationSubmissionContext - which defines all the information needed by the ResourceManager to launch - the ApplicationMaster. A client needs to set the following into the context: - - * Application Info: id, name + * The main crux of a client is to setup the <<>> + which defines all the information needed by the RM to launch the AM. A client + needs to set the following into the context: - * Queue, Priority info: Queue to which the application will be submitted, - the priority to be assigned for the application. + * Application info: id, name - * User: The user submitting the application + * Queue, priority info: Queue to which the application will be submitted, + the priority to be assigned for the application. - * ContainerLaunchContext: The information defining the container in which - the ApplicationMaster will be launched and run. The - ContainerLaunchContext, as mentioned previously, defines all the required - information needed to run the ApplicationMaster such as the local - resources (binaries, jars, files etc.), security tokens, environment - settings (CLASSPATH etc.) and the command to be executed. - - [] + * User: The user submitting the application + + * <<>>: The information defining the container in + which the AM will be launched and run. The <<>>, as + mentioned previously, defines all the required information needed to run + the application such as the local <>esources (binaries, jars, files + etc.), <>nvironment settings (CLASSPATH etc.), the <>ommand to be + executed and security <>okens (). + + [] +---+ - // Create a new ApplicationSubmissionContext - ApplicationSubmissionContext appContext = - Records.newRecord(ApplicationSubmissionContext.class); - // set the ApplicationId - appContext.setApplicationId(appId); - // set the application name - appContext.setApplicationName(appName); - - // Create a new container launch context for the AM's container - ContainerLaunchContext amContainer = - Records.newRecord(ContainerLaunchContext.class); + // set the application submission context + ApplicationSubmissionContext appContext = app.getApplicationSubmissionContext(); + ApplicationId appId = appContext.getApplicationId(); - // Define the local resources required - Map localResources = - new HashMap(); - // Lets assume the jar we need for our ApplicationMaster is available in - // HDFS at a certain known path to us and we want to make it available to - // the ApplicationMaster in the launched container - Path jarPath; // <- known path to jar file - FileStatus jarStatus = fs.getFileStatus(jarPath); - LocalResource amJarRsrc = Records.newRecord(LocalResource.class); - // Set the type of resource - file or archive - // archives are untarred at the destination by the framework - amJarRsrc.setType(LocalResourceType.FILE); - // Set visibility of the resource - // Setting to most private option i.e. this file will only - // be visible to this instance of the running application - amJarRsrc.setVisibility(LocalResourceVisibility.APPLICATION); - // Set the location of resource to be copied over into the - // working directory - amJarRsrc.setResource(ConverterUtils.getYarnUrlFromPath(jarPath)); - // Set timestamp and length of file so that the framework - // can do basic sanity checks for the local resource - // after it has been copied over to ensure it is the same - // resource the client intended to use with the application - amJarRsrc.setTimestamp(jarStatus.getModificationTime()); - amJarRsrc.setSize(jarStatus.getLen()); - // The framework will create a symlink called AppMaster.jar in the - // working directory that will be linked back to the actual file. - // The ApplicationMaster, if needs to reference the jar file, would - // need to use the symlink filename. - localResources.put("AppMaster.jar", amJarRsrc); - // Set the local resources into the launch context - amContainer.setLocalResources(localResources); + appContext.setKeepContainersAcrossApplicationAttempts(keepContainers); + appContext.setApplicationName(appName); - // Set up the environment needed for the launch context - Map env = new HashMap(); - // For example, we could setup the classpath needed. - // Assuming our classes or jars are available as local resources in the - // working directory from which the command will be run, we need to append - // "." to the path. - // By default, all the hadoop specific classpaths will already be available - // in $CLASSPATH, so we should be careful not to overwrite it. - String classPathEnv = "$CLASSPATH:./*:"; - env.put("CLASSPATH", classPathEnv); - amContainer.setEnvironment(env); - - // Construct the command to be executed on the launched container - String command = - "${JAVA_HOME}" + /bin/java" + - " MyAppMaster" + - " arg1 arg2 arg3" + - " 1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout" + - " 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr"; + // set local resources for the application master + // local files or archives as needed + // In this scenario, the jar file for the application master is part of the local resources + Map localResources = new HashMap(); - List commands = new ArrayList(); - commands.add(command); - // add additional commands if needed + LOG.info("Copy App Master jar from local filesystem and add to local environment"); + // Copy the application master jar to the filesystem + // Create a local resource to point to the destination jar path + FileSystem fs = FileSystem.get(conf); + addToLocalResources(fs, appMasterJar, appMasterJarPath, appId.toString(), + localResources, null); - // Set the command array into the container spec - amContainer.setCommands(commands); - - // Define the resource requirements for the container - // For now, YARN only supports memory so we set the memory - // requirements. - // If the process takes more than its allocated memory, it will - // be killed by the framework. - // Memory being requested for should be less than max capability - // of the cluster and all asks should be a multiple of the min capability. - Resource capability = Records.newRecord(Resource.class); - capability.setMemory(amMemory); - amContainer.setResource(capability); - - // Set the container launch content into the ApplicationSubmissionContext - appContext.setAMContainerSpec(amContainer); -+---+ + // Set the log4j properties if needed + if (!log4jPropFile.isEmpty()) { + addToLocalResources(fs, log4jPropFile, log4jPath, appId.toString(), + localResources, null); + } - * After the setup process is complete, the client is finally ready to submit - the application to the ASM. - -+---+ - // Create the request to send to the ApplicationsManager - SubmitApplicationRequest appRequest = - Records.newRecord(SubmitApplicationRequest.class); - appRequest.setApplicationSubmissionContext(appContext); + // The shell script has to be made available on the final container(s) + // where it will be executed. + // To do this, we need to first copy into the filesystem that is visible + // to the yarn framework. + // We do not need to set this as a local resource for the application + // master as the application master does not need it. + String hdfsShellScriptLocation = ""; + long hdfsShellScriptLen = 0; + long hdfsShellScriptTimestamp = 0; + if (!shellScriptPath.isEmpty()) { + Path shellSrc = new Path(shellScriptPath); + String shellPathSuffix = + appName + "/" + appId.toString() + "/" + SCRIPT_PATH; + Path shellDst = + new Path(fs.getHomeDirectory(), shellPathSuffix); + fs.copyFromLocalFile(false, true, shellSrc, shellDst); + hdfsShellScriptLocation = shellDst.toUri().toString(); + FileStatus shellFileStatus = fs.getFileStatus(shellDst); + hdfsShellScriptLen = shellFileStatus.getLen(); + hdfsShellScriptTimestamp = shellFileStatus.getModificationTime(); + } - // Submit the application to the ApplicationsManager - // Ignore the response as either a valid response object is returned on - // success or an exception thrown to denote the failure - applicationsManager.submitApplication(appRequest); -+---+ - - * At this point, the ResourceManager will have accepted the application and - in the background, will go through the process of allocating a container - with the required specifications and then eventually setting up and - launching the ApplicationMaster on the allocated container. - - * There are multiple ways a client can track progress of the actual task. - - * It can communicate with the ResourceManager and request for a report of - the application via ApplicationClientProtocol#getApplicationReport. + if (!shellCommand.isEmpty()) { + addToLocalResources(fs, null, shellCommandPath, appId.toString(), + localResources, shellCommand); + } -+-----+ - GetApplicationReportRequest reportRequest = - Records.newRecord(GetApplicationReportRequest.class); - reportRequest.setApplicationId(appId); - GetApplicationReportResponse reportResponse = - applicationsManager.getApplicationReport(reportRequest); - ApplicationReport report = reportResponse.getApplicationReport(); -+-----+ - - The ApplicationReport received from the ResourceManager consists of the following: - - * General application information: ApplicationId, queue to which the - application was submitted, user who submitted the application and the - start time for the application. - - * ApplicationMaster details: the host on which the ApplicationMaster is - running, the rpc port (if any) on which it is listening for requests - from clients and a token that the client needs to communicate with - the ApplicationMaster. - - * Application tracking information: If the application supports some - form of progress tracking, it can set a tracking url which is - available via ApplicationReport#getTrackingUrl that a client can look - at to monitor progress. - - * ApplicationStatus: The state of the application as seen by the - ResourceManager is available via - ApplicationReport#getYarnApplicationState. If the - YarnApplicationState is set to FINISHED, the client should refer to - ApplicationReport#getFinalApplicationStatus to check for the actual - success/failure of the application task itself. In case of failures, - ApplicationReport#getDiagnostics may be useful to shed some more - light on the the failure. - - * If the ApplicationMaster supports it, a client can directly query the - ApplicationMaster itself for progress updates via the host:rpcport - information obtained from the ApplicationReport. It can also use the - tracking url obtained from the report if available. + if (shellArgs.length > 0) { + addToLocalResources(fs, null, shellArgsPath, appId.toString(), + localResources, StringUtils.join(shellArgs, " ")); + } - * In certain situations, if the application is taking too long or due to - other factors, the client may wish to kill the application. The - ApplicationClientProtocol supports the forceKillApplication call that allows a - client to send a kill signal to the ApplicationMaster via the - ResourceManager. An ApplicationMaster if so designed may also support an - abort call via its rpc layer that a client may be able to leverage. + // Set the env variables to be setup in the env where the application master will be run + LOG.info("Set the environment for the application master"); + Map env = new HashMap(); -+---+ - KillApplicationRequest killRequest = - Records.newRecord(KillApplicationRequest.class); - killRequest.setApplicationId(appId); - applicationsManager.forceKillApplication(killRequest); -+---+ + // put location of shell script into env + // using the env info, the application master will create the correct local resource for the + // eventual containers that will be launched to execute the shell scripts + env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION, hdfsShellScriptLocation); + env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP, Long.toString(hdfsShellScriptTimestamp)); + env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN, Long.toString(hdfsShellScriptLen)); -** Writing an ApplicationMaster + // Add AppMaster.jar location to classpath + // At some point we should not be required to add + // the hadoop specific classpaths to the env. + // It should be provided out of the box. + // For now setting all required classpaths including + // the classpath to "." for the application jar + StringBuilder classPathEnv = new StringBuilder(Environment.CLASSPATH.$$()) + .append(ApplicationConstants.CLASS_PATH_SEPARATOR).append("./*"); + for (String c : conf.getStrings( + YarnConfiguration.YARN_APPLICATION_CLASSPATH, + YarnConfiguration.DEFAULT_YARN_CROSS_PLATFORM_APPLICATION_CLASSPATH)) { + classPathEnv.append(ApplicationConstants.CLASS_PATH_SEPARATOR); + classPathEnv.append(c.trim()); + } + classPathEnv.append(ApplicationConstants.CLASS_PATH_SEPARATOR).append( + "./log4j.properties"); - * The ApplicationMaster is the actual owner of the job. It will be launched - by the ResourceManager and via the client will be provided all the necessary - information and resources about the job that it has been tasked with to - oversee and complete. + // Set the necessary command to execute the application master + Vector vargs = new Vector(30); - * As the ApplicationMaster is launched within a container that may (likely - will) be sharing a physical host with other containers, given the - multi-tenancy nature, amongst other issues, it cannot make any assumptions - of things like pre-configured ports that it can listen on. - - * When the ApplicationMaster starts up, several parameters are made available - to it via the environment. These include the ContainerId for the - ApplicationMaster container, the application submission time and details - about the NodeManager host running the Application Master. - Ref ApplicationConstants for parameter names. + // Set java executable command + LOG.info("Setting up app master command"); + vargs.add(Environment.JAVA_HOME.$$() + "/bin/java"); + // Set Xmx based on am memory size + vargs.add("-Xmx" + amMemory + "m"); + // Set class name + vargs.add(appMasterMainClass); + // Set params for Application Master + vargs.add("--container_memory " + String.valueOf(containerMemory)); + vargs.add("--container_vcores " + String.valueOf(containerVirtualCores)); + vargs.add("--num_containers " + String.valueOf(numContainers)); + vargs.add("--priority " + String.valueOf(shellCmdPriority)); - * All interactions with the ResourceManager require an ApplicationAttemptId - (there can be multiple attempts per application in case of failures). The - ApplicationAttemptId can be obtained from the ApplicationMaster - containerId. There are helper apis to convert the value obtained from the - environment into objects. - -+---+ - Map envs = System.getenv(); - String containerIdString = - envs.get(ApplicationConstants.AM_CONTAINER_ID_ENV); - if (containerIdString == null) { - // container id should always be set in the env by the framework - throw new IllegalArgumentException( - "ContainerId not set in the environment"); + for (Map.Entry entry : shellEnv.entrySet()) { + vargs.add("--shell_env " + entry.getKey() + "=" + entry.getValue()); + } + if (debugFlag) { + vargs.add("--debug"); + } + + vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stdout"); + vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stderr"); + + // Get final commmand + StringBuilder command = new StringBuilder(); + for (CharSequence str : vargs) { + command.append(str).append(" "); + } + + LOG.info("Completed setting up app master command " + command.toString()); + List commands = new ArrayList(); + commands.add(command.toString()); + + // Set up the container launch context for the application master + ContainerLaunchContext amContainer = ContainerLaunchContext.newInstance( + localResources, env, commands, null, null, null); + + // Set up resource type requirements + // For now, both memory and vcores are supported, so we set memory and + // vcores requirements + Resource capability = Resource.newInstance(amMemory, amVCores); + appContext.setResource(capability); + + // Service data is a binary blob that can be passed to the application + // Not needed in this scenario + // amContainer.setServiceData(serviceData); + + // Setup security tokens + if (UserGroupInformation.isSecurityEnabled()) { + // Note: Credentials class is marked as LimitedPrivate for HDFS and MapReduce + Credentials credentials = new Credentials(); + String tokenRenewer = conf.get(YarnConfiguration.RM_PRINCIPAL); + if (tokenRenewer == null || tokenRenewer.length() == 0) { + throw new IOException( + "Can't get Master Kerberos principal for the RM to use as renewer"); } - ContainerId containerId = ConverterUtils.toContainerId(containerIdString); - ApplicationAttemptId appAttemptID = containerId.getApplicationAttemptId(); -+---+ - - * After an ApplicationMaster has initialized itself completely, it needs to - register with the ResourceManager via - ApplicationMasterProtocol#registerApplicationMaster. The ApplicationMaster always - communicate via the Scheduler interface of the ResourceManager. - -+---+ - // Connect to the Scheduler of the ResourceManager. - YarnConfiguration yarnConf = new YarnConfiguration(conf); - InetSocketAddress rmAddress = - NetUtils.createSocketAddr(yarnConf.get( - YarnConfiguration.RM_SCHEDULER_ADDRESS, - YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS)); - LOG.info("Connecting to ResourceManager at " + rmAddress); - ApplicationMasterProtocol resourceManager = - (ApplicationMasterProtocol) rpc.getProxy(ApplicationMasterProtocol.class, rmAddress, conf); - // Register the AM with the RM - // Set the required info into the registration request: - // ApplicationAttemptId, - // host on which the app master is running - // rpc port on which the app master accepts requests from the client - // tracking url for the client to track app master progress - RegisterApplicationMasterRequest appMasterRequest = - Records.newRecord(RegisterApplicationMasterRequest.class); - appMasterRequest.setApplicationAttemptId(appAttemptID); - appMasterRequest.setHost(appMasterHostname); - appMasterRequest.setRpcPort(appMasterRpcPort); - appMasterRequest.setTrackingUrl(appMasterTrackingUrl); + // For now, only getting tokens for the default file-system. + final Token tokens[] = + fs.addDelegationTokens(tokenRenewer, credentials); + if (tokens != null) { + for (Token token : tokens) { + LOG.info("Got dt for " + fs.getUri() + "; " + token); + } + } + DataOutputBuffer dob = new DataOutputBuffer(); + credentials.writeTokenStorageToStream(dob); + ByteBuffer fsTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); + amContainer.setTokens(fsTokens); + } - // The registration response is useful as it provides information about the - // cluster. - // Similar to the GetNewApplicationResponse in the client, it provides - // information about the min/mx resource capabilities of the cluster that - // would be needed by the ApplicationMaster when requesting for containers. - RegisterApplicationMasterResponse response = - resourceManager.registerApplicationMaster(appMasterRequest); + appContext.setAMContainerSpec(amContainer); +---+ - - * The ApplicationMaster has to emit heartbeats to the ResourceManager to keep - it informed that the ApplicationMaster is alive and still running. The - timeout expiry interval at the ResourceManager is defined by a config - setting accessible via YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS with the - default being defined by YarnConfiguration.DEFAULT_RM_AM_EXPIRY_INTERVAL_MS. - The ApplicationMasterProtocol#allocate calls to the ResourceManager count as heartbeats - as it also supports sending progress update information. Therefore, an - allocate call with no containers requested and progress information updated - if any is a valid way for making heartbeat calls to the ResourceManager. - - * Based on the task requirements, the ApplicationMaster can ask for a set of - containers to run its tasks on. The ApplicationMaster has to use the - ResourceRequest class to define the following container specifications: - - * Hostname: If containers are required to be hosted on a particular rack or - a specific host. '*' is a special value that implies any host will do. - - * Resource capability: Currently, YARN only supports memory based resource - requirements so the request should define how much memory is needed. The - value is defined in MB and has to less than the max capability of the + + * After the setup process is complete, the client is ready to submit + the application with specified priority and queue. + ++---+ + // Set the priority for the application master + Priority pri = Priority.newInstance(amPriority); + appContext.setPriority(pri); + + // Set the queue to which this application is to be submitted in the RM + appContext.setQueue(amQueue); + + // Submit the application to the applications manager + // SubmitApplicationResponse submitResp = applicationsManager.submitApplication(appRequest); + + yarnClient.submitApplication(appContext); ++---+ + + * At this point, the RM will have accepted the application and in the + background, will go through the process of allocating a container with the + required specifications and then eventually setting up and launching the AM + on the allocated container. + + * There are multiple ways a client can track progress of the actual task. + + * It can communicate with the RM and request for a report of the application + via the <<>> method of <<>>. + ++-----+ + // Get application report for the appId we are interested in + ApplicationReport report = yarnClient.getApplicationReport(appId); ++-----+ + + The <<>> received from the RM consists of the following: + + * General application information: Application id, queue to which the + application was submitted, user who submitted the application and the + start time for the application. + + * ApplicationMaster details: the host on which the AM is running, the + rpc port (if any) on which it is listening for requests from clients + and a token that the client needs to communicate with the AM. + + * Application tracking information: If the application supports some form + of progress tracking, it can set a tracking url which is available via + <<>>'s <<>> method that a client + can look at to monitor progress. + + * Application status: The state of the application as seen by the + ResourceManager is available via + <<>>. If the + <<>> is set to <<>>, the client should + refer to <<>> to check for + the actual success/failure of the application task itself. In case of + failures, <<>> may be useful to shed + some more light on the the failure. + + * If the ApplicationMaster supports it, a client can directly query the AM + itself for progress updates via the host:rpcport information obtained from + the application report. It can also use the tracking url obtained from the + report if available. + + * In certain situations, if the application is taking too long or due to other + factors, the client may wish to kill the application. <<>> + supports the <<>> call that allows a client to send a kill + signal to the AM via the ResourceManager. An ApplicationMaster if so + designed may also support an abort call via its rpc layer that a client may + be able to leverage. + ++---+ + yarnClient.killApplication(appId); ++---+ + +** Writing an ApplicationMaster (AM) + + * The AM is the actual owner of the job. It will be launched + by the RM and via the client will be provided all the + necessary information and resources about the job that it has been tasked + with to oversee and complete. + + * As the AM is launched within a container that may (likely + will) be sharing a physical host with other containers, given the + multi-tenancy nature, amongst other issues, it cannot make any assumptions + of things like pre-configured ports that it can listen on. + + * When the AM starts up, several parameters are made available + to it via the environment. These include the <<>> for the + AM container, the application submission time and details + about the NM (NodeManager) host running the ApplicationMaster. + Ref <<>> for parameter names. + + * All interactions with the RM require an <<>> (there can + be multiple attempts per application in case of failures). The + <<>> can be obtained from the AM's container id. There + are helper APIs to convert the value obtained from the environment into + objects. + ++---+ + Map envs = System.getenv(); + String containerIdString = + envs.get(ApplicationConstants.AM_CONTAINER_ID_ENV); + if (containerIdString == null) { + // container id should always be set in the env by the framework + throw new IllegalArgumentException( + "ContainerId not set in the environment"); + } + ContainerId containerId = ConverterUtils.toContainerId(containerIdString); + ApplicationAttemptId appAttemptID = containerId.getApplicationAttemptId(); ++---+ + + * After an AM has initialized itself completely, we can start the two clients: + one to ResourceManager, and one to NodeManagers. We set them up with our + customized event handler, and we will talk about those event handlers in + detail later in this article. + ++---+ + AMRMClientAsync.CallbackHandler allocListener = new RMCallbackHandler(); + amRMClient = AMRMClientAsync.createAMRMClientAsync(1000, allocListener); + amRMClient.init(conf); + amRMClient.start(); + + containerListener = createNMCallbackHandler(); + nmClientAsync = new NMClientAsyncImpl(containerListener); + nmClientAsync.init(conf); + nmClientAsync.start(); ++---+ + + * The AM has to emit heartbeats to the RM to keep it informed that the AM is + alive and still running. The timeout expiry interval at the RM is defined by + a config setting accessible via + <<>> with the default being + defined by <<>>. The + ApplicationMaster needs to register itself with the ResourceManager to + start hearbeating. + ++---+ + // Register self with ResourceManager + // This will start heartbeating to the RM + appMasterHostname = NetUtils.getHostname(); + RegisterApplicationMasterResponse response = amRMClient + .registerApplicationMaster(appMasterHostname, appMasterRpcPort, + appMasterTrackingUrl); ++---+ + + * In the response of the registration, maximum resource capability if included. You may want to use this to check the application's request. + ++---+ + // Dump out information about cluster capability as seen by the + // resource manager + int maxMem = response.getMaximumResourceCapability().getMemory(); + LOG.info("Max mem capabililty of resources in this cluster " + maxMem); + + int maxVCores = response.getMaximumResourceCapability().getVirtualCores(); + LOG.info("Max vcores capabililty of resources in this cluster " + maxVCores); + + // A resource ask cannot exceed the max. + if (containerMemory > maxMem) { + LOG.info("Container memory specified above max threshold of cluster." + + " Using max value." + ", specified=" + containerMemory + ", max=" + + maxMem); + containerMemory = maxMem; + } + + if (containerVirtualCores > maxVCores) { + LOG.info("Container virtual cores specified above max threshold of cluster." + + " Using max value." + ", specified=" + containerVirtualCores + ", max=" + + maxVCores); + containerVirtualCores = maxVCores; + } + List previousAMRunningContainers = + response.getContainersFromPreviousAttempts(); + LOG.info("Received " + previousAMRunningContainers.size() + + " previous AM's running containers on AM registration."); ++---+ + + * Based on the task requirements, the AM can ask for a set of containers to run + its tasks on. We can now calculate how many containers we need, and request + those many containers. + ++---+ + List previousAMRunningContainers = + response.getContainersFromPreviousAttempts(); + List previousAMRunningContainers = + response.getContainersFromPreviousAttempts(); + LOG.info("Received " + previousAMRunningContainers.size() + + " previous AM's running containers on AM registration."); + + int numTotalContainersToRequest = + numTotalContainers - previousAMRunningContainers.size(); + // Setup ask for containers from RM + // Send request for containers to RM + // Until we get our fully allocated quota, we keep on polling RM for + // containers + // Keep looping until all the containers are launched and shell script + // executed on them ( regardless of success/failure). + for (int i = 0; i < numTotalContainersToRequest; ++i) { + ContainerRequest containerAsk = setupContainerAskForRM(); + amRMClient.addContainerRequest(containerAsk); + } ++---+ + + * In <<>>, the follow two things need some set up: + + * Resource capability: Currently, YARN supports memory based resource + requirements so the request should define how much memory is needed. The + value is defined in MB and has to less than the max capability of the cluster and an exact multiple of the min capability. Memory resources - correspond to physical memory limits imposed on the task containers. - - * Priority: When asking for sets of containers, an ApplicationMaster may - define different priorities to each set. For example, the Map-Reduce - ApplicationMaster may assign a higher priority to containers needed - for the Map tasks and a lower priority for the Reduce tasks' containers. - - [] - -+----+ - // Resource Request - ResourceRequest rsrcRequest = Records.newRecord(ResourceRequest.class); + correspond to physical memory limits imposed on the task containers. It + will also support computation based resource (vCore), as shown in the code. - // setup requirements for hosts - // whether a particular rack/host is needed - // useful for applications that are sensitive - // to data locality - rsrcRequest.setHostName("*"); + * Priority: When asking for sets of containers, an AM may define different + priorities to each set. For example, the Map-Reduce AM may assign a higher + priority to containers needed for the Map tasks and a lower priority for + the Reduce tasks' containers. + [] + ++---+ + private ContainerRequest setupContainerAskForRM() { + // setup requirements for hosts + // using * as any host will do for the distributed shell app // set the priority for the request - Priority pri = Records.newRecord(Priority.class); - pri.setPriority(requestPriority); - rsrcRequest.setPriority(pri); + Priority pri = Priority.newInstance(requestPriority); // Set up resource type requirements - // For now, only memory is supported so we set memory requirements - Resource capability = Records.newRecord(Resource.class); - capability.setMemory(containerMemory); - rsrcRequest.setCapability(capability); - - // set no. of containers needed - // matching the specifications - rsrcRequest.setNumContainers(numContainers); -+---+ - - * After defining the container requirements, the ApplicationMaster has to - construct an AllocateRequest to send to the ResourceManager. - The AllocateRequest consists of: - - * Requested containers: The container specifications and the no. of - containers being requested for by the ApplicationMaster from the - ResourceManager. - - * Released containers: There may be situations when the ApplicationMaster - may have requested for more containers that it needs or due to failure - issues, decide to use other containers allocated to it. In all such - situations, it is beneficial to the cluster if the ApplicationMaster - releases these containers back to the ResourceManager so that they can be - re-allocated to other applications. - - * ResponseId: The response id that will be sent back in the response from - the allocate call. - - * Progress update information: The ApplicationMaster can send its progress - update (range between to 0 to 1) to the ResourceManager. - - [] - -+---+ - List requestedContainers; - List releasedContainers - AllocateRequest req = Records.newRecord(AllocateRequest.class); - - // The response id set in the request will be sent back in - // the response so that the ApplicationMaster can - // match it to its original ask and act appropriately. - req.setResponseId(rmRequestID); - - // Set ApplicationAttemptId - req.setApplicationAttemptId(appAttemptID); - - // Add the list of containers being asked for - req.addAllAsks(requestedContainers); - - // If the ApplicationMaster has no need for certain - // containers due to over-allocation or for any other - // reason, it can release them back to the ResourceManager - req.addAllReleases(releasedContainers); - - // Assuming the ApplicationMaster can track its progress - req.setProgress(currentProgress); - - AllocateResponse allocateResponse = resourceManager.allocate(req); -+---+ - - * The AllocateResponse sent back from the ResourceManager provides the - following information: - - * Reboot flag: For scenarios when the ApplicationMaster may get out of sync - with the ResourceManager. - - * Allocated containers: The containers that have been allocated to the - ApplicationMaster. - - * Headroom: Headroom for resources in the cluster. Based on this information - and knowing its needs, an ApplicationMaster can make intelligent decisions - such as re-prioritizing sub-tasks to take advantage of currently allocated - containers, bailing out faster if resources are not becoming available - etc. - - * Completed containers: Once an ApplicationMaster triggers a launch an - allocated container, it will receive an update from the ResourceManager - when the container completes. The ApplicationMaster can look into the - status of the completed container and take appropriate actions such as - re-trying a particular sub-task in case of a failure. - - * Number of cluster nodes: The number of hosts available on the cluster. - - [] - - One thing to note is that containers will not be immediately allocated to - the ApplicationMaster. This does not imply that the ApplicationMaster should - keep on asking the pending count of required containers. Once an allocate - request has been sent, the ApplicationMaster will eventually be allocated - the containers based on cluster capacity, priorities and the scheduling - policy in place. The ApplicationMaster should only request for containers - again if and only if its original estimate changed and it needs additional - containers. + // For now, memory and CPU are supported so we set memory and cpu requirements + Resource capability = Resource.newInstance(containerMemory, + containerVirtualCores); + ContainerRequest request = new ContainerRequest(capability, null, null, + pri); + LOG.info("Requested container ask: " + request.toString()); + return request; + } +---+ - // Retrieve list of allocated containers from the response - // and on each allocated container, lets assume we are launching - // the same job. - List allocatedContainers = allocateResponse.getAllocatedContainers(); + * After container allocation requests have been sent by the application + manager, contailers will be launched asynchronously, by the event handler of + the <<>> client. The handler should implement + <<>> interface. + + * When there are containers allocated, the handler sets up a thread that runs + the code to launch containers. Here we use the name + <<>> to demonstrate. We will talk about the + <<>> class in the following part of this article. + ++---+ + @Override + public void onContainersAllocated(List allocatedContainers) { + LOG.info("Got response from RM for container ask, allocatedCnt=" + + allocatedContainers.size()); + numAllocatedContainers.addAndGet(allocatedContainers.size()); for (Container allocatedContainer : allocatedContainers) { - LOG.info("Launching shell command on a new container." - + ", containerId=" + allocatedContainer.getId() - + ", containerNode=" + allocatedContainer.getNodeId().getHost() - + ":" + allocatedContainer.getNodeId().getPort() - + ", containerNodeURI=" + allocatedContainer.getNodeHttpAddress() - + ", containerState" + allocatedContainer.getState() - + ", containerResourceMemory" - + allocatedContainer.getResource().getMemory()); - - - // Launch and start the container on a separate thread to keep the main - // thread unblocked as all containers may not be allocated at one go. - LaunchContainerRunnable runnableLaunchContainer = - new LaunchContainerRunnable(allocatedContainer); - Thread launchThread = new Thread(runnableLaunchContainer); + LaunchContainerRunnable runnableLaunchContainer = + new LaunchContainerRunnable(allocatedContainer, containerListener); + Thread launchThread = new Thread(runnableLaunchContainer); + + // launch and start the container on a separate thread to keep + // the main thread unblocked + // as all containers may not be allocated at one go. launchThreads.add(launchThread); launchThread.start(); } - - // Check what the current available resources in the cluster are - Resource availableResources = allocateResponse.getAvailableResources(); - // Based on this information, an ApplicationMaster can make appropriate - // decisions - - // Check the completed containers - // Let's assume we are keeping a count of total completed containers, - // containers that failed and ones that completed successfully. - List completedContainers = - allocateResponse.getCompletedContainersStatuses(); - for (ContainerStatus containerStatus : completedContainers) { - LOG.info("Got container status for containerID= " - + containerStatus.getContainerId() - + ", state=" + containerStatus.getState() - + ", exitStatus=" + containerStatus.getExitStatus() - + ", diagnostics=" + containerStatus.getDiagnostics()); - - int exitStatus = containerStatus.getExitStatus(); - if (0 != exitStatus) { - // container failed - // -100 is a special case where the container - // was aborted/pre-empted for some reason - if (-100 != exitStatus) { - // application job on container returned a non-zero exit code - // counts as completed - numCompletedContainers.incrementAndGet(); - numFailedContainers.incrementAndGet(); - } - else { - // something else bad happened - // app job did not complete for some reason - // we should re-try as the container was lost for some reason - // decrementing the requested count so that we ask for an - // additional one in the next allocate call. - numRequestedContainers.decrementAndGet(); - // we do not need to release the container as that has already - // been done by the ResourceManager/NodeManager. - } - } - else { - // nothing to do - // container completed successfully - numCompletedContainers.incrementAndGet(); - numSuccessfulContainers.incrementAndGet(); - } - } - } -+---+ - - - * After a container has been allocated to the ApplicationMaster, it needs to - follow a similar process that the Client followed in setting up the - ContainerLaunchContext for the eventual task that is going to be running on - the allocated Container. Once the ContainerLaunchContext is defined, the - ApplicationMaster can then communicate with the ContainerManager to start - its allocated container. - -+---+ - - //Assuming an allocated Container obtained from AllocateResponse - Container container; - // Connect to ContainerManager on the allocated container - String cmIpPortStr = container.getNodeId().getHost() + ":" - + container.getNodeId().getPort(); - InetSocketAddress cmAddress = NetUtils.createSocketAddr(cmIpPortStr); - ContainerManager cm = - (ContainerManager)rpc.getProxy(ContainerManager.class, cmAddress, conf); - - // Now we setup a ContainerLaunchContext - ContainerLaunchContext ctx = - Records.newRecord(ContainerLaunchContext.class); - - ctx.setContainerId(container.getId()); - ctx.setResource(container.getResource()); - - try { - ctx.setUser(UserGroupInformation.getCurrentUser().getShortUserName()); - } catch (IOException e) { - LOG.info( - "Getting current user failed when trying to launch the container", - + e.getMessage()); - } - - // Set the environment - Map unixEnv; - // Setup the required env. - // Please note that the launched container does not inherit - // the environment of the ApplicationMaster so all the - // necessary environment settings will need to be re-setup - // for this allocated container. - ctx.setEnvironment(unixEnv); - - // Set the local resources - Map localResources = - new HashMap(); - // Again, the local resources from the ApplicationMaster is not copied over - // by default to the allocated container. Thus, it is the responsibility - // of the ApplicationMaster to setup all the necessary local resources - // needed by the job that will be executed on the allocated container. - - // Assume that we are executing a shell script on the allocated container - // and the shell script's location in the filesystem is known to us. - Path shellScriptPath; - LocalResource shellRsrc = Records.newRecord(LocalResource.class); - shellRsrc.setType(LocalResourceType.FILE); - shellRsrc.setVisibility(LocalResourceVisibility.APPLICATION); - shellRsrc.setResource( - ConverterUtils.getYarnUrlFromURI(new URI(shellScriptPath))); - shellRsrc.setTimestamp(shellScriptPathTimestamp); - shellRsrc.setSize(shellScriptPathLen); - localResources.put("MyExecShell.sh", shellRsrc); - - ctx.setLocalResources(localResources); - - // Set the necessary command to execute on the allocated container - String command = "/bin/sh ./MyExecShell.sh" - + " 1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout" - + " 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr"; - - List commands = new ArrayList(); - commands.add(command); - ctx.setCommands(commands); - - // Send the start request to the ContainerManager - StartContainerRequest startReq = Records.newRecord(StartContainerRequest.class); - startReq.setContainerLaunchContext(ctx); - cm.startContainer(startReq); -+---+ - - * The ApplicationMaster, as mentioned previously, will get updates of - completed containers as part of the response from the ApplicationMasterProtocol#allocate - calls. It can also monitor its launched containers pro-actively by querying - the ContainerManager for the status. - + } +---+ - GetContainerStatusRequest statusReq = - Records.newRecord(GetContainerStatusRequest.class); - statusReq.setContainerId(container.getId()); - GetContainerStatusResponse statusResp = cm.getContainerStatus(statusReq); - LOG.info("Container Status" - + ", id=" + container.getId() - + ", status=" + statusResp.getStatus()); -+---+ + * On heart beat, the event handler reports the progress of the application. + ++---+ + @Override + public float getProgress() { + // set progress to deliver to RM on next heartbeat + float progress = (float) numCompletedContainers.get() + / numTotalContainers; + return progress; + } ++---+ + + [] + + * The container launch thread actually launches the containers on NMs. After a + container has been allocated to the AM, it needs to follow a similar process + that the client followed in setting up the <<>> for + the eventual task that is going to be running on the allocated Container. + Once the <<>> is defined, the AM can start it through + the <<>>. + ++---+ + // Set the necessary command to execute on the allocated container + Vector vargs = new Vector(5); + + // Set executable command + vargs.add(shellCommand); + // Set shell script path + if (!scriptPath.isEmpty()) { + vargs.add(Shell.WINDOWS ? ExecBatScripStringtPath + : ExecShellStringPath); + } + + // Set args for the shell command if any + vargs.add(shellArgs); + // Add log redirect params + vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout"); + vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr"); + + // Get final commmand + StringBuilder command = new StringBuilder(); + for (CharSequence str : vargs) { + command.append(str).append(" "); + } + + List commands = new ArrayList(); + commands.add(command.toString()); + + // Set up ContainerLaunchContext, setting local resource, environment, + // command and token for constructor. + + // Note for tokens: Set up tokens for the container too. Today, for normal + // shell commands, the container in distribute-shell doesn't need any + // tokens. We are populating them mainly for NodeManagers to be able to + // download anyfiles in the distributed file-system. The tokens are + // otherwise also useful in cases, for e.g., when one is running a + // "hadoop dfs" command inside the distributed shell. + ContainerLaunchContext ctx = ContainerLaunchContext.newInstance( + localResources, shellEnv, commands, null, allTokens.duplicate(), null); + containerListener.addContainer(container.getId(), container); + nmClientAsync.startContainerAsync(container, ctx); ++---+ + + * The <<>> object, together with its event handler, handles container events. Including container start, stop, status update, and occurs an error. + + * After the ApplicationMaster determines the work is done, it needs to unregister itself through the AM-RM client, and then stops the client. + ++---+ + try { + amRMClient.unregisterApplicationMaster(appStatus, appMessage, null); + } catch (YarnException ex) { + LOG.error("Failed to unregister application", ex); + } catch (IOException e) { + LOG.error("Failed to unregister application", e); + } + + amRMClient.stop(); ++---+ ~~** Defining the context in which your code runs -~~*** Container Resource Requests +~~*** Container Resource Requests -~~*** Local Resources +~~*** Local Resources -~~*** Environment +~~*** Environment -~~**** Managing the CLASSPATH +~~**** Managing the CLASSPATH -~~** Security +~~** Security -* FAQ +* FAQ -** How can I distribute my application's jars to all of the nodes in the YARN +** How can I distribute my application's jars to all of the nodes in the YARN cluster that need it? - You can use the LocalResource to add resources to your application request. - This will cause YARN to distribute the resource to the ApplicationMaster node. - If the resource is a tgz, zip, or jar - you can have YARN unzip it. Then, all - you need to do is add the unzipped folder to your classpath. - For example, when creating your application request: + * You can use the LocalResource to add resources to your application request. + This will cause YARN to distribute the resource to the ApplicationMaster + node. If the resource is a tgz, zip, or jar - you can have YARN unzip it. + Then, all you need to do is add the unzipped folder to your classpath. For + example, when creating your application request: +---+ - File packageFile = new File(packagePath); - Url packageUrl = ConverterUtils.getYarnUrlFromPath( - FileContext.getFileContext.makeQualified(new Path(packagePath))); + File packageFile = new File(packagePath); + Url packageUrl = ConverterUtils.getYarnUrlFromPath( + FileContext.getFileContext.makeQualified(new Path(packagePath))); - packageResource.setResource(packageUrl); - packageResource.setSize(packageFile.length()); - packageResource.setTimestamp(packageFile.lastModified()); - packageResource.setType(LocalResourceType.ARCHIVE); - packageResource.setVisibility(LocalResourceVisibility.APPLICATION); + packageResource.setResource(packageUrl); + packageResource.setSize(packageFile.length()); + packageResource.setTimestamp(packageFile.lastModified()); + packageResource.setType(LocalResourceType.ARCHIVE); + packageResource.setVisibility(LocalResourceVisibility.APPLICATION); - resource.setMemory(memory) - containerCtx.setResource(resource) - containerCtx.setCommands(ImmutableList.of( - "java -cp './package/*' some.class.to.Run " - + "1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout " - + "2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")) - containerCtx.setLocalResources( - Collections.singletonMap("package", packageResource)) - appCtx.setApplicationId(appId) - appCtx.setUser(user.getShortUserName) - appCtx.setAMContainerSpec(containerCtx) - request.setApplicationSubmissionContext(appCtx) - applicationsManager.submitApplication(request) + resource.setMemory(memory); + containerCtx.setResource(resource); + containerCtx.setCommands(ImmutableList.of( + "java -cp './package/*' some.class.to.Run " + + "1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout " + + "2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")); + containerCtx.setLocalResources( + Collections.singletonMap("package", packageResource)); + appCtx.setApplicationId(appId); + appCtx.setUser(user.getShortUserName); + appCtx.setAMContainerSpec(containerCtx); + yarnClient.submitApplication(appCtx); +---+ - As you can see, the setLocalResources command takes a map of names to - resources. The name becomes a sym link in your application's cwd, so you can - just refer to the artifacts inside by using ./package/*. - - Note: Java's classpath (cp) argument is VERY sensitive. + As you can see, the <<>> command takes a map of names to + resources. The name becomes a sym link in your application's cwd, so you can + just refer to the artifacts inside by using ./package/*. + + Note: Java's classpath (cp) argument is VERY sensitive. Make sure you get the syntax EXACTLY correct. - Once your package is distributed to your ApplicationMaster, you'll need to - follow the same process whenever your ApplicationMaster starts a new container - (assuming you want the resources to be sent to your container). The code for - this is the same. You just need to make sure that you give your - ApplicationMaster the package path (either HDFS, or local), so that it can - send the resource URL along with the container ctx. + Once your package is distributed to your AM, you'll need to follow the same + process whenever your AM starts a new container (assuming you want the + resources to be sent to your container). The code for this is the same. You + just need to make sure that you give your AM the package path (either HDFS, or + local), so that it can send the resource URL along with the container ctx. -** How do I get the ApplicationMaster's ApplicationAttemptId? +** How do I get the ApplicationMaster's <<>>? + * The <<>> will be passed to the AM via the environment + and the value from the environment can be converted into an + <<>> object via the ConverterUtils helper function. - The ApplicationAttemptId will be passed to the ApplicationMaster via the - environment and the value from the environment can be converted into an - ApplicationAttemptId object via the ConverterUtils helper function. +** Why my container is killed by the NodeManager? -** My container is being killed by the Node Manager - - This is likely due to high memory usage exceeding your requested container - memory size. There are a number of reasons that can cause this. First, look - at the process tree that the node manager dumps when it kills your container. - The two things you're interested in are physical memory and virtual memory. - If you have exceeded physical memory limits your app is using too much physical - memory. If you're running a Java app, you can use -hprof to look at what is - taking up space in the heap. If you have exceeded virtual memory, you may - need to increase the value of the the cluster-wide configuration variable - <<>>. + * This is likely due to high memory usage exceeding your requested container + memory size. There are a number of reasons that can cause this. First, look + at the process tree that the NodeManager dumps when it kills your container. + The two things you're interested in are physical memory and virtual memory. + If you have exceeded physical memory limits your app is using too much + physical memory. If you're running a Java app, you can use -hprof to look at + what is taking up space in the heap. If you have exceeded virtual memory, you + may need to increase the value of the the cluster-wide configuration variable + <<>>. ** How do I include native libraries? - - Setting -Djava.library.path on the command line while launching a container - can cause native libraries used by Hadoop to not be loaded correctly and can - result in errors. It is cleaner to use LD_LIBRARY_PATH instead. + * Setting <<<-Djava.library.path>>> on the command line while launching a + container can cause native libraries used by Hadoop to not be loaded + correctly and can result in errors. It is cleaner to use + <<>> instead. * Useful Links - * {{{https://issues.apache.org/jira/secure/attachment/12486023/MapReduce_NextGen_Architecture.pdf}Map Reduce Next Generation Architecture}} + * {{{http://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/YARN.html}YARN Architecture}} - * {{{http://developer.yahoo.com/blogs/hadoop/posts/2011/03/mapreduce-nextgen-scheduler/}Map Reduce Next Generation Scheduler}} + * {{{http://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/CapacityScheduler.html}YARN Capacity Scheduler}} + + * {{{http://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/FairScheduler.html}YARN Fair Scheduler}} + +* Sample code + + * Yarn distributed shell: in <<>> + project after you set up your development environment.