diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 0fefdde6428..f6a011877f6 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -75,6 +75,9 @@ Release 0.23.0 - Unreleased MAPREDUCE-2930. Added the ability to be able to generate graphs from the state-machine definitions. (Binglin Chang via vinodkv) + MAPREDUCE-2719. Add a simple, DistributedShell, application to illustrate + alternate frameworks on YARN. (Hitesh Shah via acmurthy) + IMPROVEMENTS MAPREDUCE-2187. Reporter sends progress during sort/merge. (Anupam Seth via diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/pom.xml b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/pom.xml new file mode 100644 index 00000000000..406ec436d5a --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/pom.xml @@ -0,0 +1,105 @@ + + + + + hadoop-yarn-applications + org.apache.hadoop + ${yarn.version} + + 4.0.0 + org.apache.hadoop + hadoop-yarn-applications-distributedshell + hadoop-yarn-applications-distributedshell + + + ${project.artifact.file} + ${project.parent.parent.basedir} + + + + + org.apache.hadoop + hadoop-yarn-api + ${yarn.version} + + + org.apache.hadoop + hadoop-yarn-common + ${yarn.version} + + + org.apache.hadoop + hadoop-yarn-server-nodemanager + test + ${yarn.version} + + + org.apache.hadoop + hadoop-yarn-server-resourcemanager + test + ${yarn.version} + + + org.apache.hadoop + hadoop-yarn-server-common + test + ${yarn.version} + + + org.apache.hadoop + hadoop-yarn-server-tests + test-jar + test + ${yarn.version} + + + + + + + maven-jar-plugin + + + + jar + + + test-compile + + + + + maven-dependency-plugin + + + build-classpath + generate-sources + + build-classpath + + + + target/classes/yarn-apps-ds-generated-classpath + + + + + + + + + diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java new file mode 100644 index 00000000000..19800ba91a3 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java @@ -0,0 +1,831 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.applications.distributedshell; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.net.InetSocketAddress; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Vector; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.GnuParser; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.AMRMProtocol; +import org.apache.hadoop.yarn.api.ApplicationConstants; +import org.apache.hadoop.yarn.api.ContainerManager; + +import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; +//import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest; +//import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusResponse; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; + +import org.apache.hadoop.yarn.api.records.AMResponse; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.api.records.LocalResourceType; +import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnRemoteException; +import org.apache.hadoop.yarn.ipc.YarnRPC; +import org.apache.hadoop.yarn.util.ConverterUtils; +import org.apache.hadoop.yarn.util.Records; + +/** + * An ApplicationMaster for executing shell commands on a set of launched containers using the YARN framework. + * + *

This class is meant to act as an example on how to write yarn-based application masters.

+ * + *

The ApplicationMaster is started on a container by the ResourceManager's launcher. + * The first thing that the ApplicationMaster needs to do is to connect and register itself with + * the ResourceManager. The registration sets up information within the ResourceManager + * regarding what host:port the ApplicationMaster is listening on to provide any form of functionality to a client + * as well as a tracking url that a client can use to keep track of status/job history if needed.

+ * + *

The ApplicationMaster needs to send a heartbeat to the ResourceManager at regular intervals + * to inform the ResourceManager that it is up and alive. The {@link AMRMProtocol#allocate} to the + * ResourceManager from the ApplicationMaster acts as a heartbeat. + * + *

For the actual handling of the job, the ApplicationMaster has to request for the + * ResourceManager via {@link AllocateRequest} for the required no. of containers using {@link ResourceRequest} + * with the necessary resource specifications such as node location, computational (memory/disk/cpu) resource requirements. + * The ResourceManager responds with an {@link AllocateResponse} that informs the ApplicationMaster + * of the set of newly allocated containers, completed containers as well as current state of available resources.

+ * + *

For each allocated container, the ApplicationMaster can then set up the necessary launch context via + * {@link ContainerLaunchContext} to specify the allocated container id, local resources required by the executable, + * the environment to be setup for the executable, commands to execute, etc. and submit a {@link StartContainerRequest} + * to the {@link ContainerManager} to launch and execute the defined commands on the given allocated container.

+ * + *

The ApplicationMaster can monitor the launched container by either querying the ResourceManager + * using {@link AMRMProtocol#allocate} to get updates on completed containers or via the {@link ContainerManager} + * by querying for the status of the allocated container's {@link ContainerId}. + * + *

After the job has been completed, the ApplicationMaster has to send a {@link FinishApplicationMasterRequest} + * to the ResourceManager to inform it that the ApplicationMaster has been completed. + */ +public class ApplicationMaster { + + private static final Log LOG = LogFactory.getLog(ApplicationMaster.class); + + // Configuration + private Configuration conf; + // YARN RPC to communicate with the Resource Manager or Node Manager + private YarnRPC rpc; + + // Handle to communicate with the Resource Manager + private AMRMProtocol resourceManager; + + // Application Attempt Id ( combination of attemptId and fail count ) + private ApplicationAttemptId appAttemptID; + + // TODO + // For status update for clients - yet to be implemented + // Hostname of the container + private String appMasterHostname = ""; + // Port on which the app master listens for status update requests from clients + private int appMasterRpcPort = 0; + // Tracking url to which app master publishes info for clients to monitor + private String appMasterTrackingUrl = ""; + + // App Master configuration + // No. of containers to run shell command on + private int numTotalContainers = 1; + // Memory to request for the container on which the shell command will run + private int containerMemory = 10; + // Priority of the request + private int requestPriority; + + // Incremental counter for rpc calls to the RM + private AtomicInteger rmRequestID = new AtomicInteger(); + + // Simple flag to denote whether all works is done + private boolean appDone = false; + // Counter for completed containers ( complete denotes successful or failed ) + private AtomicInteger numCompletedContainers = new AtomicInteger(); + // Allocated container count so that we know how many containers has the RM + // allocated to us + private AtomicInteger numAllocatedContainers = new AtomicInteger(); + // Count of failed containers + private AtomicInteger numFailedContainers = new AtomicInteger(); + // Count of containers already requested from the RM + // Needed as once requested, we should not request for containers again and again. + // Only request for more if the original requirement changes. + private AtomicInteger numRequestedContainers = new AtomicInteger(); + + // Shell command to be executed + private String shellCommand = ""; + // Args to be passed to the shell command + private String shellArgs = ""; + // Env variables to be setup for the shell command + private Map shellEnv = new HashMap(); + + // Location of shell script ( obtained from info set in env ) + // Shell script path in fs + private String shellScriptPath = ""; + // Timestamp needed for creating a local resource + private long shellScriptPathTimestamp = 0; + // File length needed for local resource + private long shellScriptPathLen = 0; + + // Hardcoded path to shell script in launch container's local env + private final String ExecShellStringPath = "ExecShellScript.sh"; + + // Containers to be released + private CopyOnWriteArrayList releasedContainers = new CopyOnWriteArrayList(); + + // Launch threads + private List launchThreads = new ArrayList(); + + /** + * @param args Command line args + */ + public static void main(String[] args) { + boolean result = false; + try { + ApplicationMaster appMaster = new ApplicationMaster(); + LOG.info("Initializing ApplicationMaster"); + boolean doRun = appMaster.init(args); + if (!doRun) { + System.exit(0); + } + result = appMaster.run(); + } catch (Throwable t) { + LOG.fatal("Error running ApplicationMaster", t); + System.exit(1); + } + if (result) { + LOG.info("Application Master completed successfully. exiting"); + System.exit(0); + } + else { + LOG.info("Application Master failed. exiting"); + System.exit(2); + } + } + + /** + * Dump out contents of $CWD and the environment to stdout for debugging + */ + private void dumpOutDebugInfo() { + + LOG.info("Dump debug output"); + Map envs = System.getenv(); + for (Map.Entry env : envs.entrySet()) { + LOG.info("System env: key=" + env.getKey() + ", val=" + env.getValue()); + System.out.println("System env: key=" + env.getKey() + ", val=" + env.getValue()); + } + + String cmd = "ls -al"; + Runtime run = Runtime.getRuntime(); + Process pr = null; + try { + pr = run.exec(cmd); + pr.waitFor(); + + BufferedReader buf = new BufferedReader(new InputStreamReader(pr.getInputStream())); + String line = ""; + while ((line=buf.readLine())!=null) { + LOG.info("System CWD content: " + line); + System.out.println("System CWD content: " + line); + } + buf.close(); + } catch (IOException e) { + e.printStackTrace(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + public ApplicationMaster() throws Exception { + // Set up the configuration and RPC + conf = new Configuration(); + rpc = YarnRPC.create(conf); + } + /** + * Parse command line options + * @param args Command line args + * @return Whether init successful and run should be invoked + * @throws ParseException + * @throws IOException + */ + public boolean init(String[] args) throws ParseException, IOException { + + Options opts = new Options(); + opts.addOption("app_attempt_id", true, "App Attempt ID. Not to be used unless for testing purposes"); + opts.addOption("shell_command", true, "Shell command to be executed by the Application Master"); + opts.addOption("shell_script", true, "Location of the shell script to be executed"); + opts.addOption("shell_args", true, "Command line args for the shell script"); + opts.addOption("shell_env", true, "Environment for shell script. Specified as env_key=env_val pairs"); + opts.addOption("container_memory", true, "Amount of memory in MB to be requested to run the shell command"); + opts.addOption("num_containers", true, "No. of containers on which the shell command needs to be executed"); + opts.addOption("priority", true, "Application Priority. Default 0"); + opts.addOption("debug", false, "Dump out debug information"); + + opts.addOption("help", false, "Print usage"); + CommandLine cliParser = new GnuParser().parse(opts, args); + + if (args.length == 0) { + printUsage(opts); + throw new IllegalArgumentException("No args specified for application master to initialize"); + } + + if (cliParser.hasOption("help")) { + printUsage(opts); + return false; + } + + if (cliParser.hasOption("debug")) { + dumpOutDebugInfo(); + } + + Map envs = System.getenv(); + + appAttemptID = Records.newRecord(ApplicationAttemptId.class); + if (!envs.containsKey(ApplicationConstants.APPLICATION_ATTEMPT_ID_ENV)) { + if (cliParser.hasOption("app_attempt_id")) { + String appIdStr = cliParser.getOptionValue("app_attempt_id", ""); + appAttemptID = ConverterUtils.toApplicationAttemptId(appIdStr); + } + else { + throw new IllegalArgumentException("Application Attempt Id not set in the environment"); + } + } else { + appAttemptID = ConverterUtils.toApplicationAttemptId(envs.get(ApplicationConstants.APPLICATION_ATTEMPT_ID_ENV)); + } + + LOG.info("Application master for app" + + ", appId=" + appAttemptID.getApplicationId().getId() + + ", clustertimestamp=" + appAttemptID.getApplicationId().getClusterTimestamp() + + ", attemptId=" + appAttemptID.getAttemptId()); + + if (!cliParser.hasOption("shell_command")) { + throw new IllegalArgumentException("No shell command specified to be executed by application master"); + } + shellCommand = cliParser.getOptionValue("shell_command"); + + if (cliParser.hasOption("shell_args")) { + shellArgs = cliParser.getOptionValue("shell_args"); + } + if (cliParser.hasOption("shell_env")) { + String shellEnvs[] = cliParser.getOptionValues("shell_env"); + for (String env : shellEnvs) { + env = env.trim(); + int index = env.indexOf('='); + if (index == -1) { + shellEnv.put(env, ""); + continue; + } + String key = env.substring(0, index); + String val = ""; + if (index < (env.length()-1)) { + val = env.substring(index+1); + } + shellEnv.put(key, val); + } + } + + if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION)) { + shellScriptPath = envs.get(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION); + + if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP)) { + shellScriptPathTimestamp = Long.valueOf(envs.get(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP)); + } + if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN)) { + shellScriptPathLen = Long.valueOf(envs.get(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN)); + } + + if (!shellScriptPath.isEmpty() + && (shellScriptPathTimestamp <= 0 + || shellScriptPathLen <= 0)) { + LOG.error("Illegal values in env for shell script path" + + ", path=" + shellScriptPath + + ", len=" + shellScriptPathLen + + ", timestamp=" + shellScriptPathTimestamp); + throw new IllegalArgumentException("Illegal values in env for shell script path"); + } + } + + containerMemory = Integer.parseInt(cliParser.getOptionValue("container_memory", "10")); + numTotalContainers = Integer.parseInt(cliParser.getOptionValue("num_containers", "1")); + requestPriority = Integer.parseInt(cliParser.getOptionValue("priority", "0")); + + return true; + } + + /** + * Helper function to print usage + * @param opts Parsed command line options + */ + private void printUsage(Options opts) { + new HelpFormatter().printHelp("ApplicationMaster", opts); + } + + /** + * Main run function for the application master + * @throws YarnRemoteException + */ + public boolean run() throws YarnRemoteException { + LOG.info("Starting ApplicationMaster"); + + // Connect to ResourceManager + resourceManager = connectToRM(); + + // Setup local RPC Server to accept status requests directly from clients + // TODO need to setup a protocol for client to be able to communicate to the RPC server + // TODO use the rpc port info to register with the RM for the client to send requests to this app master + + // Register self with ResourceManager + RegisterApplicationMasterResponse response = registerToRM(); + // Dump out information about cluster capability as seen by the resource manager + int minMem = response.getMinimumResourceCapability().getMemory(); + int maxMem = response.getMaximumResourceCapability().getMemory(); + LOG.info("Min mem capabililty of resources in this cluster " + minMem); + LOG.info("Max mem capabililty of resources in this cluster " + maxMem); + + // A resource ask has to be atleast the minimum of the capability of the cluster, the value has to be + // a multiple of the min value and cannot exceed the max. + // If it is not an exact multiple of min, the RM will allocate to the nearest multiple of min + if (containerMemory < minMem) { + LOG.info("Container memory specified below min threshold of cluster. Using min value." + + ", specified=" + containerMemory + + ", min=" + minMem); + containerMemory = minMem; + } + else if (containerMemory > maxMem) { + LOG.info("Container memory specified above max threshold of cluster. Using max value." + + ", specified=" + containerMemory + + ", max=" + maxMem); + containerMemory = maxMem; + } + + // Setup heartbeat emitter + // TODO poll RM every now and then with an empty request to let RM know that we are alive + // The heartbeat interval after which an AM is timed out by the RM is defined by a config setting: + // RM_AM_EXPIRY_INTERVAL_MS with default defined by DEFAULT_RM_AM_EXPIRY_INTERVAL_MS + // The allocate calls to the RM count as heartbeats so, for now, this additional heartbeat emitter + // is not required. + + // Setup ask for containers from RM + // Send request for containers to RM + // Until we get our fully allocated quota, we keep on polling RM for containers + // Keep looping until all the containers are launched and shell script executed on them + // ( regardless of success/failure). + + int loopCounter = -1; + + while (numCompletedContainers.get() < numTotalContainers + && !appDone) { + loopCounter++; + + // log current state + LOG.info("Current application state: loop=" + loopCounter + + ", appDone=" + appDone + + ", total=" + numTotalContainers + + ", requested=" + numRequestedContainers + + ", completed=" + numCompletedContainers + + ", failed=" + numFailedContainers + + ", currentAllocated=" + numAllocatedContainers); + + // Sleep before each loop when asking RM for containers + // to avoid flooding RM with spurious requests when it + // need not have any available containers + // Sleeping for 1000 ms. + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + LOG.info("Sleep interrupted " + e.getMessage()); + } + + // No. of containers to request + // For the first loop, askCount will be equal to total containers needed + // From that point on, askCount will always be 0 as current implementation + // does not change its ask on container failures. + int askCount = numTotalContainers - numRequestedContainers.get(); + numRequestedContainers.addAndGet(askCount); + + // Setup request to be sent to RM to allocate containers + List resourceReq = new ArrayList(); + if (askCount > 0) { + ResourceRequest containerAsk = setupContainerAskForRM(askCount); + resourceReq.add(containerAsk); + } + + // Send the request to RM + LOG.info("Asking RM for containers" + + ", askCount=" + askCount); + AMResponse amResp = sendContainerAskToRM(resourceReq); + + // Retrieve list of allocated containers from the response + List allocatedContainers = amResp.getAllocatedContainers(); + LOG.info("Got response from RM for container ask, allocatedCnt=" + allocatedContainers.size()); + numAllocatedContainers.addAndGet(allocatedContainers.size()); + for (Container allocatedContainer : allocatedContainers) { + LOG.info("Launching shell command on a new container." + + ", containerId=" + allocatedContainer.getId() + + ", containerNode=" + allocatedContainer.getNodeId().getHost() + + ":" + allocatedContainer.getNodeId().getPort() + + ", containerNodeURI=" + allocatedContainer.getNodeHttpAddress() + + ", containerState" + allocatedContainer.getState() + + ", containerResourceMemory" + allocatedContainer.getResource().getMemory()); + // + ", containerToken" + allocatedContainer.getContainerToken().getIdentifier().toString()); + + LaunchContainerRunnable runnableLaunchContainer = new LaunchContainerRunnable(allocatedContainer); + Thread launchThread = new Thread(runnableLaunchContainer); + + // launch and start the container on a separate thread to keep the main thread unblocked + // as all containers may not be allocated at one go. + launchThreads.add(launchThread); + launchThread.start(); + } + + // Check what the current available resources in the cluster are + // TODO should we do anything if the available resources are not enough? + Resource availableResources = amResp.getAvailableResources(); + LOG.info("Current available resources in the cluster " + availableResources); + + // Check the completed containers + List completedContainers = amResp.getCompletedContainersStatuses(); + LOG.info("Got response from RM for container ask, completedCnt=" + completedContainers.size()); + for (ContainerStatus containerStatus : completedContainers) { + LOG.info("Got container status for containerID= " + containerStatus.getContainerId() + + ", state=" + containerStatus.getState() + + ", exitStatus=" + containerStatus.getExitStatus() + + ", diagnostics=" + containerStatus.getDiagnostics()); + + // non complete containers should not be here + assert(containerStatus.getState() == ContainerState.COMPLETE); + + // increment counters for completed/failed containers + int exitStatus = containerStatus.getExitStatus(); + if (0 != exitStatus) { + // container failed + if (-100 != exitStatus) { + // shell script failed + // counts as completed + numCompletedContainers.incrementAndGet(); + numFailedContainers.incrementAndGet(); + } + else { + // something else bad happened + // app job did not complete for some reason + // we should re-try as the container was lost for some reason + numAllocatedContainers.decrementAndGet(); + numRequestedContainers.decrementAndGet(); + // we do not need to release the container as it would be done + // by the RM/CM. + } + } + else { + // nothing to do + // container completed successfully + numCompletedContainers.incrementAndGet(); + LOG.info("Container completed successfully." + + ", containerId=" + containerStatus.getContainerId()); + } + + } + if (numCompletedContainers.get() == numTotalContainers) { + appDone = true; + } + + LOG.info("Current application state: loop=" + loopCounter + + ", appDone=" + appDone + + ", total=" + numTotalContainers + + ", requested=" + numRequestedContainers + + ", completed=" + numCompletedContainers + + ", failed=" + numFailedContainers + + ", currentAllocated=" + numAllocatedContainers); + + // TODO + // Add a timeout handling layer + // for misbehaving shell commands + } + + // Join all launched threads + // needed for when we time out + // and we need to release containers + for (Thread launchThread : launchThreads) { + try { + launchThread.join(10000); + } catch (InterruptedException e) { + LOG.info("Exception thrown in thread join: " + e.getMessage()); + e.printStackTrace(); + } + } + + // When the application completes, it should send a finish application signal + // to the RM + LOG.info("Application completed. Signalling finish to RM"); + + FinishApplicationMasterRequest finishReq = Records.newRecord(FinishApplicationMasterRequest.class); + finishReq.setAppAttemptId(appAttemptID); + boolean isSuccess = true; + if (numFailedContainers.get() == 0) { + finishReq.setFinishApplicationStatus(FinalApplicationStatus.SUCCEEDED); + } + else { + finishReq.setFinishApplicationStatus(FinalApplicationStatus.FAILED); + String diagnostics = "Diagnostics." + + ", total=" + numTotalContainers + + ", completed=" + numCompletedContainers.get() + + ", allocated=" + numAllocatedContainers.get() + + ", failed=" + numFailedContainers.get(); + finishReq.setDiagnostics(diagnostics); + isSuccess = false; + } + resourceManager.finishApplicationMaster(finishReq); + return isSuccess; + } + + /** + * Thread to connect to the {@link ContainerManager} and + * launch the container that will execute the shell command. + */ + private class LaunchContainerRunnable implements Runnable { + + // Allocated container + Container container; + // Handle to communicate with ContainerManager + ContainerManager cm; + + /** + * @param lcontainer Allocated container + */ + public LaunchContainerRunnable(Container lcontainer) { + this.container = lcontainer; + } + + /** + * Helper function to connect to CM + */ + private void connectToCM() { + String cmIpPortStr = container.getNodeId().getHost() + ":" + + container.getNodeId().getPort(); + InetSocketAddress cmAddress = NetUtils.createSocketAddr(cmIpPortStr); + LOG.info("Connecting to ResourceManager at " + cmIpPortStr); + this.cm = ((ContainerManager) rpc.getProxy(ContainerManager.class, cmAddress, conf)); + } + + + @Override + /** + * Connects to CM, sets up container launch context + * for shell command and eventually dispatches the container + * start request to the CM. + */ + public void run() { + // Connect to ContainerManager + LOG.info("Connecting to container manager for containerid=" + container.getId()); + connectToCM(); + + LOG.info("Setting up container launch container for containerid=" + container.getId()); + ContainerLaunchContext ctx = Records.newRecord(ContainerLaunchContext.class); + + ctx.setContainerId(container.getId()); + ctx.setResource(container.getResource()); + + try { + ctx.setUser(UserGroupInformation.getCurrentUser().getShortUserName()); + } catch (IOException e) { + LOG.info("Getting current user info failed when trying to launch the container" + + e.getMessage()); + } + + // Set the environment + ctx.setEnvironment(shellEnv); + + // Set the local resources + Map localResources = new HashMap(); + + // The container for the eventual shell commands needs its own local resources too. + // In this scenario, if a shell script is specified, we need to have it copied + // and made available to the container. + if (!shellScriptPath.isEmpty()) { + LocalResource shellRsrc = Records.newRecord(LocalResource.class); + shellRsrc.setType(LocalResourceType.FILE); + shellRsrc.setVisibility(LocalResourceVisibility.APPLICATION); + try { + shellRsrc.setResource(ConverterUtils.getYarnUrlFromURI(new URI(shellScriptPath))); + } catch (URISyntaxException e) { + LOG.error("Error when trying to use shell script path specified in env" + + ", path=" + shellScriptPath); + e.printStackTrace(); + + // A failure scenario on bad input such as invalid shell script path + // We know we cannot continue launching the container + // so we should release it. + // TODO + numCompletedContainers.incrementAndGet(); + numFailedContainers.incrementAndGet(); + return; + } + shellRsrc.setTimestamp(shellScriptPathTimestamp); + shellRsrc.setSize(shellScriptPathLen); + localResources.put(ExecShellStringPath, shellRsrc); + } + ctx.setLocalResources(localResources); + + // Set the necessary command to execute on the allocated container + Vector vargs = new Vector(5); + + // Set executable command + vargs.add(shellCommand); + // Set shell script path + if (!shellScriptPath.isEmpty()) { + vargs.add(ExecShellStringPath); + } + + // Set args for the shell command if any + vargs.add(shellArgs); + // Add log redirect params + // TODO + // We should redirect the output to hdfs instead of local logs + // so as to be able to look at the final output after the containers + // have been released. + // Could use a path suffixed with /AppId/AppAttempId/ContainerId/std[out|err] + vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout"); + vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr"); + + // Get final commmand + StringBuilder command = new StringBuilder(); + for (CharSequence str : vargs) { + command.append(str).append(" "); + } + + List commands = new ArrayList(); + commands.add(command.toString()); + ctx.setCommands(commands); + + StartContainerRequest startReq = Records.newRecord(StartContainerRequest.class); + startReq.setContainerLaunchContext(ctx); + try { + cm.startContainer(startReq); + } catch (YarnRemoteException e) { + LOG.info("Start container failed for :" + + ", containerId=" + container.getId()); + e.printStackTrace(); + // TODO do we need to release this container? + } + + // Get container status? + // Left commented out as the shell scripts are short lived + // and we are relying on the status for completed containers from RM to detect status + + // GetContainerStatusRequest statusReq = Records.newRecord(GetContainerStatusRequest.class); + // statusReq.setContainerId(container.getId()); + // GetContainerStatusResponse statusResp; + // try { + // statusResp = cm.getContainerStatus(statusReq); + // LOG.info("Container Status" + // + ", id=" + container.getId() + // + ", status=" +statusResp.getStatus()); + // } catch (YarnRemoteException e) { + // e.printStackTrace(); + // } + } + } + + /** + * Connect to the Resource Manager + * @return Handle to communicate with the RM + */ + private AMRMProtocol connectToRM() { + YarnConfiguration yarnConf = new YarnConfiguration(conf); + InetSocketAddress rmAddress = NetUtils.createSocketAddr(yarnConf.get( + YarnConfiguration.RM_SCHEDULER_ADDRESS, + YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS)); + LOG.info("Connecting to ResourceManager at " + rmAddress); + return ((AMRMProtocol) rpc.getProxy(AMRMProtocol.class, rmAddress, conf)); + } + + /** + * Register the Application Master to the Resource Manager + * @return the registration response from the RM + * @throws YarnRemoteException + */ + private RegisterApplicationMasterResponse registerToRM() throws YarnRemoteException { + RegisterApplicationMasterRequest appMasterRequest = Records.newRecord(RegisterApplicationMasterRequest.class); + + // set the required info into the registration request: + // application attempt id, + // host on which the app master is running + // rpc port on which the app master accepts requests from the client + // tracking url for the app master + appMasterRequest.setApplicationAttemptId(appAttemptID); + appMasterRequest.setHost(appMasterHostname); + appMasterRequest.setRpcPort(appMasterRpcPort); + appMasterRequest.setTrackingUrl(appMasterTrackingUrl); + + return resourceManager.registerApplicationMaster(appMasterRequest); + } + + /** + * Setup the request that will be sent to the RM for the container ask. + * @param numContainers Containers to ask for from RM + * @return the setup ResourceRequest to be sent to RM + */ + private ResourceRequest setupContainerAskForRM(int numContainers) { + ResourceRequest request = Records.newRecord(ResourceRequest.class); + + // setup requirements for hosts + // whether a particular rack/host is needed + // Refer to apis under org.apache.hadoop.net for more + // details on how to get figure out rack/host mapping. + // using * as any host will do for the distributed shell app + request.setHostName("*"); + + // set no. of containers needed + request.setNumContainers(numContainers); + + // set the priority for the request + Priority pri = Records.newRecord(Priority.class); + // TODO - what is the range for priority? how to decide? + pri.setPriority(requestPriority); + request.setPriority(pri); + + // Set up resource type requirements + // For now, only memory is supported so we set memory requirements + Resource capability = Records.newRecord(Resource.class); + capability.setMemory(containerMemory); + request.setCapability(capability); + + return request; + } + + /** + * Ask RM to allocate given no. of containers to this Application Master + * @param requestedContainers Containers to ask for from RM + * @return Response from RM to AM with allocated containers + * @throws YarnRemoteException + */ + private AMResponse sendContainerAskToRM(List requestedContainers) + throws YarnRemoteException { + AllocateRequest req = Records.newRecord(AllocateRequest.class); + req.setResponseId(rmRequestID.incrementAndGet()); + req.setApplicationAttemptId(appAttemptID); + req.addAllAsks(requestedContainers); + req.addAllReleases(releasedContainers); + req.setProgress((float)numCompletedContainers.get()/numTotalContainers); + + LOG.info("Sending request to RM for containers" + + ", requestedSet=" + requestedContainers.size() + + ", releasedSet=" + releasedContainers.size() + + ", progress=" + req.getProgress()); + + for (ResourceRequest rsrcReq : requestedContainers) { + LOG.info("Requested container ask: " + rsrcReq.toString()); + } + for (ContainerId id : releasedContainers) { + LOG.info("Released container, id=" + id.getId()); + } + + AllocateResponse resp = resourceManager.allocate(req); + return resp.getAMResponse(); + } +} diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java new file mode 100644 index 00000000000..63be4cbcc50 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java @@ -0,0 +1,791 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.applications.distributedshell; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Vector; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.GnuParser; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.SecurityInfo; +import org.apache.hadoop.yarn.api.ApplicationConstants; +import org.apache.hadoop.yarn.api.ClientRMProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse; +import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.api.records.LocalResourceType; +import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; +import org.apache.hadoop.yarn.api.records.NodeReport; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.QueueACL; +import org.apache.hadoop.yarn.api.records.QueueInfo; +import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnRemoteException; +import org.apache.hadoop.yarn.ipc.YarnRPC; +import org.apache.hadoop.yarn.security.client.ClientRMSecurityInfo; +import org.apache.hadoop.yarn.util.ConverterUtils; +import org.apache.hadoop.yarn.util.Records; + + +/** + * Client for Distributed Shell application submission to YARN. + * + *

The distributed shell client allows an application master to be launched that in turn would run + * the provided shell command on a set of containers.

+ * + *

This client is meant to act as an example on how to write yarn-based applications.

+ * + *

To submit an application, a client first needs to connect to the ResourceManager + * aka ApplicationsManager or ASM via the {@link ClientRMProtocol}. The {@link ClientRMProtocol} + * provides a way for the client to get access to cluster information and to request for a + * new {@link ApplicationId}.

+ * + *

For the actual job submission, the client first has to create an {@link ApplicationSubmissionContext}. + * The {@link ApplicationSubmissionContext} defines the application details such as {@link ApplicationId} + * and application name, user submitting the application, the priority assigned to the application and the queue + * to which this application needs to be assigned. In addition to this, the {@link ApplicationSubmissionContext} + * also defines the {@link ContainerLaunchContext} which describes the Container with which + * the {@link ApplicationMaster} is launched.

+ * + *

The {@link ContainerLaunchContext} in this scenario defines the resources to be allocated for the + * {@link ApplicationMaster}'s container, the local resources (jars, configuration files) to be made available + * and the environment to be set for the {@link ApplicationMaster} and the commands to be executed to run the + * {@link ApplicationMaster}.

+ * + *

Using the {@link ApplicationSubmissionContext}, the client submits the application to the + * ResourceManager and then monitors the application by requesting the ResourceManager + * for an {@link ApplicationReport} at regular time intervals. In case of the application taking too long, the client + * kills the application by submitting a {@link KillApplicationRequest} to the ResourceManager.

+ * + */ +public class Client { + + private static final Log LOG = LogFactory.getLog(Client.class); + + // Configuration + private Configuration conf; + + // RPC to communicate to RM + private YarnRPC rpc; + + // Handle to talk to the Resource Manager/Applications Manager + private ClientRMProtocol applicationsManager; + + // Application master specific info to register a new Application with RM/ASM + private String appName = ""; + // App master priority + private int amPriority = 0; + // Queue for App master + private String amQueue = ""; + // User to run app master as + private String amUser = ""; + // Amt. of memory resource to request for to run the App Master + private int amMemory = 10; + + // Application master jar file + private String appMasterJar = ""; + // Main class to invoke application master + private String appMasterMainClass = ""; + + // Shell command to be executed + private String shellCommand = ""; + // Location of shell script + private String shellScriptPath = ""; + // Args to be passed to the shell command + private String shellArgs = ""; + // Env variables to be setup for the shell command + private Map shellEnv = new HashMap(); + // Shell Command Container priority + private int shellCmdPriority = 0; + + // Amt of memory to request for container in which shell script will be executed + private int containerMemory = 10; + // No. of containers in which the shell script needs to be executed + private int numContainers = 1; + + // log4j.properties file + // if available, add to local resources and set into classpath + private String log4jPropFile = ""; + + // Start time for client + private final long clientStartTime = System.currentTimeMillis(); + // Timeout threshold for client. Kill app after time interval expires. + private long clientTimeout = 600000; + + // Debug flag + boolean debugFlag = false; + + /** + * @param args Command line arguments + */ + public static void main(String[] args) { + boolean result = false; + try { + Client client = new Client(); + LOG.info("Initializing Client"); + boolean doRun = client.init(args); + if (!doRun) { + System.exit(0); + } + result = client.run(); + } catch (Throwable t) { + LOG.fatal("Error running CLient", t); + System.exit(1); + } + if (result) { + LOG.info("Application completed successfully"); + System.exit(0); + } + LOG.error("Application failed to complete successfully"); + System.exit(2); + } + + /** + */ + public Client() throws Exception { + // Set up the configuration and RPC + conf = new Configuration(); + rpc = YarnRPC.create(conf); + } + + /** + * Helper function to print out usage + * @param opts Parsed command line options + */ + private void printUsage(Options opts) { + new HelpFormatter().printHelp("Client", opts); + } + + /** + * Parse command line options + * @param args Parsed command line options + * @return Whether the init was successful to run the client + */ + public boolean init(String[] args) throws ParseException { + + Options opts = new Options(); + opts.addOption("appname", true, "Application Name. Default value - DistributedShell"); + opts.addOption("priority", true, "Application Priority. Default 0"); + opts.addOption("queue", true, "RM Queue in which this application is to be submitted"); + opts.addOption("user", true, "User to run the application as"); + opts.addOption("timeout", true, "Application timeout in milliseconds"); + opts.addOption("master_memory", true, "Amount of memory in MB to be requested to run the application master"); + opts.addOption("jar", true, "Jar file containing the application master"); + opts.addOption("class", true, "Main class to be run for the Application Master."); + opts.addOption("shell_command", true, "Shell command to be executed by the Application Master"); + opts.addOption("shell_script", true, "Location of the shell script to be executed"); + opts.addOption("shell_args", true, "Command line args for the shell script"); + opts.addOption("shell_env", true, "Environment for shell script. Specified as env_key=env_val pairs"); + opts.addOption("shell_cmd_priority", true, "Priority for the shell command containers"); + opts.addOption("container_memory", true, "Amount of memory in MB to be requested to run the shell command"); + opts.addOption("num_containers", true, "No. of containers on which the shell command needs to be executed"); + opts.addOption("log_properties", true, "log4j.properties file"); + opts.addOption("debug", false, "Dump out debug information"); + opts.addOption("help", false, "Print usage"); + CommandLine cliParser = new GnuParser().parse(opts, args); + + if (args.length == 0) { + printUsage(opts); + throw new IllegalArgumentException("No args specified for client to initialize"); + } + + if (cliParser.hasOption("help")) { + printUsage(opts); + return false; + } + + if (cliParser.hasOption("debug")) { + debugFlag = true; + + } + + appName = cliParser.getOptionValue("appname", "DistributedShell"); + amPriority = Integer.parseInt(cliParser.getOptionValue("priority", "0")); + amQueue = cliParser.getOptionValue("queue", ""); + amUser = cliParser.getOptionValue("user", ""); + amMemory = Integer.parseInt(cliParser.getOptionValue("master_memory", "10")); + + if (amMemory < 0) { + throw new IllegalArgumentException("Invalid memory specified for application master, exiting." + + " Specified memory=" + amMemory); + } + + if (!cliParser.hasOption("jar")) { + throw new IllegalArgumentException("No jar file specified for application master"); + } + + appMasterJar = cliParser.getOptionValue("jar"); + appMasterMainClass = cliParser.getOptionValue("class", + "org.apache.hadoop.yarn.applications.distributedshell.ApplicationMaster"); + + if (!cliParser.hasOption("shell_command")) { + throw new IllegalArgumentException("No shell command specified to be executed by application master"); + } + shellCommand = cliParser.getOptionValue("shell_command"); + + if (cliParser.hasOption("shell_script")) { + shellScriptPath = cliParser.getOptionValue("shell_script"); + } + if (cliParser.hasOption("shell_args")) { + shellArgs = cliParser.getOptionValue("shell_args"); + } + if (cliParser.hasOption("shell_env")) { + String envs[] = cliParser.getOptionValues("shell_env"); + for (String env : envs) { + env = env.trim(); + int index = env.indexOf('='); + if (index == -1) { + shellEnv.put(env, ""); + continue; + } + String key = env.substring(0, index); + String val = ""; + if (index < (env.length()-1)) { + val = env.substring(index+1); + } + shellEnv.put(key, val); + } + } + shellCmdPriority = Integer.parseInt(cliParser.getOptionValue("shell_cmd_priority", "0")); + + containerMemory = Integer.parseInt(cliParser.getOptionValue("container_memory", "10")); + numContainers = Integer.parseInt(cliParser.getOptionValue("num_containers", "1")); + + if (containerMemory < 0 || numContainers < 1) { + throw new IllegalArgumentException("Invalid no. of containers or container memory specified, exiting." + + " Specified containerMemory=" + containerMemory + + ", numContainer=" + numContainers); + } + + clientTimeout = Integer.parseInt(cliParser.getOptionValue("timeout", "600000")); + + log4jPropFile = cliParser.getOptionValue("log_properties", ""); + + return true; + } + + /** + * Main run function for the client + * @return true if application completed successfully + * @throws IOException + */ + public boolean run() throws IOException { + LOG.info("Starting Client"); + + // Connect to ResourceManager + connectToASM(); + assert(applicationsManager != null); + + // Use ClientRMProtocol handle to general cluster information + GetClusterMetricsRequest clusterMetricsReq = Records.newRecord(GetClusterMetricsRequest.class); + GetClusterMetricsResponse clusterMetricsResp = applicationsManager.getClusterMetrics(clusterMetricsReq); + LOG.info("Got Cluster metric info from ASM" + + ", numNodeManagers=" + clusterMetricsResp.getClusterMetrics().getNumNodeManagers()); + + GetClusterNodesRequest clusterNodesReq = Records.newRecord(GetClusterNodesRequest.class); + GetClusterNodesResponse clusterNodesResp = applicationsManager.getClusterNodes(clusterNodesReq); + LOG.info("Got Cluster node info from ASM"); + for (NodeReport node : clusterNodesResp.getNodeReports()) { + LOG.info("Got node report from ASM for" + + ", nodeId=" + node.getNodeId() + + ", nodeAddress" + node.getHttpAddress() + + ", nodeRackName" + node.getRackName() + + ", nodeNumContainers" + node.getNumContainers() + + ", nodeHealthStatus" + node.getNodeHealthStatus()); + } + + GetQueueInfoRequest queueInfoReq = Records.newRecord(GetQueueInfoRequest.class); + GetQueueInfoResponse queueInfoResp = applicationsManager.getQueueInfo(queueInfoReq); + QueueInfo queueInfo = queueInfoResp.getQueueInfo(); + LOG.info("Queue info" + + ", queueName=" + queueInfo.getQueueName() + + ", queueCurrentCapacity=" + queueInfo.getCurrentCapacity() + + ", queueMaxCapacity=" + queueInfo.getMaximumCapacity() + + ", queueApplicationCount=" + queueInfo.getApplications().size() + + ", queueChildQueueCount=" + queueInfo.getChildQueues().size()); + + GetQueueUserAclsInfoRequest queueUserAclsReq = Records.newRecord(GetQueueUserAclsInfoRequest.class); + GetQueueUserAclsInfoResponse queueUserAclsResp = applicationsManager.getQueueUserAcls(queueUserAclsReq); + List listAclInfo = queueUserAclsResp.getUserAclsInfoList(); + for (QueueUserACLInfo aclInfo : listAclInfo) { + for (QueueACL userAcl : aclInfo.getUserAcls()) { + LOG.info("User ACL Info for Queue" + + ", queueName=" + aclInfo.getQueueName() + + ", userAcl=" + userAcl.name()); + } + } + + // Get a new application id + GetNewApplicationResponse newApp = getApplication(); + ApplicationId appId = newApp.getApplicationId(); + + // TODO get min/max resource capabilities from RM and change memory ask if needed + // If we do not have min/max, we may not be able to correctly request + // the required resources from the RM for the app master + // Memory ask has to be a multiple of min and less than max. + // Dump out information about cluster capability as seen by the resource manager + int minMem = newApp.getMinimumResourceCapability().getMemory(); + int maxMem = newApp.getMaximumResourceCapability().getMemory(); + LOG.info("Min mem capabililty of resources in this cluster " + minMem); + LOG.info("Max mem capabililty of resources in this cluster " + maxMem); + + // A resource ask has to be atleast the minimum of the capability of the cluster, the value has to be + // a multiple of the min value and cannot exceed the max. + // If it is not an exact multiple of min, the RM will allocate to the nearest multiple of min + if (amMemory < minMem) { + LOG.info("AM memory specified below min threshold of cluster. Using min value." + + ", specified=" + amMemory + + ", min=" + minMem); + amMemory = minMem; + } + else if (amMemory > maxMem) { + LOG.info("AM memory specified above max threshold of cluster. Using max value." + + ", specified=" + amMemory + + ", max=" + maxMem); + amMemory = maxMem; + } + + // Create launch context for app master + LOG.info("Setting up application submission context for ASM"); + ApplicationSubmissionContext appContext = Records.newRecord(ApplicationSubmissionContext.class); + + // set the application id + appContext.setApplicationId(appId); + // set the application name + appContext.setApplicationName(appName); + + // Set up the container launch context for the application master + ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class); + + // set local resources for the application master + // local files or archives as needed + // In this scenario, the jar file for the application master is part of the local resources + Map localResources = new HashMap(); + + LOG.info("Copy App Master jar from local filesystem and add to local environment"); + // Copy the application master jar to the filesystem + // Create a local resource to point to the destination jar path + FileSystem fs = FileSystem.get(conf); + Path src = new Path(appMasterJar); + String pathSuffix = appName + "/" + appId.getId() + "/AppMaster.jar"; + Path dst = new Path(fs.getHomeDirectory(), pathSuffix); + fs.copyFromLocalFile(false, true, src, dst); + FileStatus destStatus = fs.getFileStatus(dst); + LocalResource amJarRsrc = Records.newRecord(LocalResource.class); + + // Set the type of resource - file or archive + // archives are untarred at destination + // we don't need the jar file to be untarred for now + amJarRsrc.setType(LocalResourceType.FILE); + // Set visibility of the resource + // Setting to most private option + amJarRsrc.setVisibility(LocalResourceVisibility.APPLICATION); + // Set the resource to be copied over + amJarRsrc.setResource(ConverterUtils.getYarnUrlFromPath(dst)); + // Set timestamp and length of file so that the framework + // can do basic sanity checks for the local resource + // after it has been copied over to ensure it is the same + // resource the client intended to use with the application + amJarRsrc.setTimestamp(destStatus.getModificationTime()); + amJarRsrc.setSize(destStatus.getLen()); + localResources.put("AppMaster.jar", amJarRsrc); + + // Set the log4j properties if needed + if (!log4jPropFile.isEmpty()) { + Path log4jSrc = new Path(log4jPropFile); + Path log4jDst = new Path(fs.getHomeDirectory(), "log4j.props"); + fs.copyFromLocalFile(false, true, log4jSrc, log4jDst); + FileStatus log4jFileStatus = fs.getFileStatus(log4jDst); + LocalResource log4jRsrc = Records.newRecord(LocalResource.class); + log4jRsrc.setType(LocalResourceType.FILE); + log4jRsrc.setVisibility(LocalResourceVisibility.APPLICATION); + log4jRsrc.setResource(ConverterUtils.getYarnUrlFromURI(log4jDst.toUri())); + log4jRsrc.setTimestamp(log4jFileStatus.getModificationTime()); + log4jRsrc.setSize(log4jFileStatus.getLen()); + localResources.put("log4j.properties", log4jRsrc); + } + + // The shell script has to be made available on the final container(s) + // where it will be executed. + // To do this, we need to first copy into the filesystem that is visible + // to the yarn framework. + // We do not need to set this as a local resource for the application + // master as the application master does not need it. + String hdfsShellScriptLocation = ""; + long hdfsShellScriptLen = 0; + long hdfsShellScriptTimestamp = 0; + if (!shellScriptPath.isEmpty()) { + Path shellSrc = new Path(shellScriptPath); + String shellPathSuffix = appName + "/" + appId.getId() + "/ExecShellScript.sh"; + Path shellDst = new Path(fs.getHomeDirectory(), shellPathSuffix); + fs.copyFromLocalFile(false, true, shellSrc, shellDst); + hdfsShellScriptLocation = shellDst.toUri().toString(); + FileStatus shellFileStatus = fs.getFileStatus(shellDst); + hdfsShellScriptLen = shellFileStatus.getLen(); + hdfsShellScriptTimestamp = shellFileStatus.getModificationTime(); + } + + // Set local resource info into app master container launch context + amContainer.setLocalResources(localResources); + + // Set the necessary security tokens as needed + //amContainer.setContainerTokens(containerToken); + + // Set the env variables to be setup in the env where the application master will be run + LOG.info("Set the environment for the application master"); + Map env = new HashMap(); + + // put location of shell script into env + // using the env info, the application master will create the correct local resource for the + // eventual containers that will be launched to execute the shell scripts + env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION, hdfsShellScriptLocation); + env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP, Long.toString(hdfsShellScriptTimestamp)); + env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN, Long.toString(hdfsShellScriptLen)); + + // Add AppMaster.jar location to classpath + // At some point we should not be required to add + // the hadoop specific classpaths to the env. + // It should be provided out of the box. + // For now setting all required classpaths including + // the classpath to "." for the application jar + String classPathEnv = "${CLASSPATH}" + + ":./*" + + ":$HADOOP_CONF_DIR" + + ":$HADOOP_COMMON_HOME/share/hadoop/common/*" + + ":$HADOOP_COMMON_HOME/share/hadoop/common/lib/*" + + ":$HADOOP_HDFS_HOME/share/hadoop/hdfs/*" + + ":$HADOOP_HDFS_HOME/share/hadoop/hdfs/lib/*" + + ":$YARN_HOME/modules/*" + + ":$YARN_HOME/lib/*" + + ":./log4j.properties:"; + + // add the runtime classpath needed for tests to work + String testRuntimeClassPath = Client.getTestRuntimeClasspath(); + classPathEnv += ":" + testRuntimeClassPath; + + env.put("CLASSPATH", classPathEnv); + + amContainer.setEnvironment(env); + + // Set the necessary command to execute the application master + Vector vargs = new Vector(30); + + // Set java executable command + LOG.info("Setting up app master command"); + vargs.add("${JAVA_HOME}" + "/bin/java"); + // Set class name + vargs.add(appMasterMainClass); + // Set params for Application Master + vargs.add("--container_memory " + String.valueOf(containerMemory)); + vargs.add("--num_containers " + String.valueOf(numContainers)); + vargs.add("--priority " + String.valueOf(shellCmdPriority)); + if (!shellCommand.isEmpty()) { + vargs.add("--shell_command " + shellCommand + ""); + } + if (!shellArgs.isEmpty()) { + vargs.add("--shell_args " + shellArgs + ""); + } + for (Map.Entry entry : shellEnv.entrySet()) { + vargs.add("--shell_env " + entry.getKey() + "=" + entry.getValue()); + } + if (debugFlag) { + vargs.add("--debug"); + } + + vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stdout"); + vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stderr"); + + // Get final commmand + StringBuilder command = new StringBuilder(); + for (CharSequence str : vargs) { + command.append(str).append(" "); + } + + LOG.info("Completed setting up app master command " + command.toString()); + List commands = new ArrayList(); + commands.add(command.toString()); + amContainer.setCommands(commands); + + // For launching an AM Container, setting user here is not needed + // Set user in ApplicationSubmissionContext + // amContainer.setUser(amUser); + + // Set up resource type requirements + // For now, only memory is supported so we set memory requirements + Resource capability = Records.newRecord(Resource.class); + capability.setMemory(amMemory); + amContainer.setResource(capability); + + // Service data is a binary blob that can be passed to the application + // Not needed in this scenario + // amContainer.setServiceData(serviceData); + + // The following are not required for launching an application master + // amContainer.setContainerId(containerId); + + appContext.setAMContainerSpec(amContainer); + + // Set the priority for the application master + Priority pri = Records.newRecord(Priority.class); + // TODO - what is the range for priority? how to decide? + pri.setPriority(amPriority); + appContext.setPriority(pri); + + // Set the queue to which this application is to be submitted in the RM + appContext.setQueue(amQueue); + // Set the user submitting this application + // TODO can it be empty? + appContext.setUser(amUser); + + // Create the request to send to the applications manager + SubmitApplicationRequest appRequest = Records.newRecord(SubmitApplicationRequest.class); + appRequest.setApplicationSubmissionContext(appContext); + + // Submit the application to the applications manager + // SubmitApplicationResponse submitResp = applicationsManager.submitApplication(appRequest); + // Ignore the response as either a valid response object is returned on success + // or an exception thrown to denote some form of a failure + LOG.info("Submitting application to ASM"); + applicationsManager.submitApplication(appRequest); + + // TODO + // Try submitting the same request again + // app submission failure? + + // Monitor the application + return monitorApplication(appId); + + } + + /** + * Monitor the submitted application for completion. + * Kill application if time expires. + * @param appId Application Id of application to be monitored + * @return true if application completed successfully + * @throws YarnRemoteException + */ + private boolean monitorApplication(ApplicationId appId) throws YarnRemoteException { + + while (true) { + + // Check app status every 1 second. + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + LOG.debug("Thread sleep in monitoring loop interrupted"); + } + + // Get application report for the appId we are interested in + GetApplicationReportRequest reportRequest = Records.newRecord(GetApplicationReportRequest.class); + reportRequest.setApplicationId(appId); + GetApplicationReportResponse reportResponse = applicationsManager.getApplicationReport(reportRequest); + ApplicationReport report = reportResponse.getApplicationReport(); + + LOG.info("Got application report from ASM for" + + ", appId=" + appId.getId() + + ", clientToken=" + report.getClientToken() + + ", appDiagnostics=" + report.getDiagnostics() + + ", appMasterHost=" + report.getHost() + + ", appQueue=" + report.getQueue() + + ", appMasterRpcPort=" + report.getRpcPort() + + ", appStartTime=" + report.getStartTime() + + ", yarnAppState=" + report.getYarnApplicationState().toString() + + ", distributedFinalState=" + report.getFinalApplicationStatus().toString() + + ", appTrackingUrl=" + report.getTrackingUrl() + + ", appUser=" + report.getUser()); + + YarnApplicationState state = report.getYarnApplicationState(); + FinalApplicationStatus dsStatus = report.getFinalApplicationStatus(); + if (YarnApplicationState.FINISHED == state) { + if (FinalApplicationStatus.SUCCEEDED == dsStatus) { + LOG.info("Application has completed successfully. Breaking monitoring loop"); + return true; + } + else { + LOG.info("Application did finished unsuccessfully." + + " YarnState=" + state.toString() + ", DSFinalStatus=" + dsStatus.toString() + + ". Breaking monitoring loop"); + return false; + } + } + else if (YarnApplicationState.KILLED == state + || YarnApplicationState.FAILED == state) { + LOG.info("Application did not finish." + + " YarnState=" + state.toString() + ", DSFinalStatus=" + dsStatus.toString() + + ". Breaking monitoring loop"); + return false; + } + + if (System.currentTimeMillis() > (clientStartTime + clientTimeout)) { + LOG.info("Reached client specified timeout for application. Killing application"); + killApplication(appId); + return false; + } + } + + } + + /** + * Kill a submitted application by sending a call to the ASM + * @param appId Application Id to be killed. + * @throws YarnRemoteException + */ + private void killApplication(ApplicationId appId) throws YarnRemoteException { + KillApplicationRequest request = Records.newRecord(KillApplicationRequest.class); + // TODO clarify whether multiple jobs with the same app id can be submitted and be running at + // the same time. + // If yes, can we kill a particular attempt only? + request.setApplicationId(appId); + // KillApplicationResponse response = applicationsManager.forceKillApplication(request); + // Response can be ignored as it is non-null on success or + // throws an exception in case of failures + applicationsManager.forceKillApplication(request); + } + + /** + * Connect to the Resource Manager/Applications Manager + * @return Handle to communicate with the ASM + * @throws IOException + */ + private void connectToASM() throws IOException { + + /* + UserGroupInformation user = UserGroupInformation.getCurrentUser(); + applicationsManager = user.doAs(new PrivilegedAction() { + public ClientRMProtocol run() { + InetSocketAddress rmAddress = NetUtils.createSocketAddr(conf.get( + YarnConfiguration.RM_SCHEDULER_ADDRESS, + YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS)); + LOG.info("Connecting to ResourceManager at " + rmAddress); + Configuration appsManagerServerConf = new Configuration(conf); + appsManagerServerConf.setClass(YarnConfiguration.YARN_SECURITY_INFO, + ClientRMSecurityInfo.class, SecurityInfo.class); + ClientRMProtocol asm = ((ClientRMProtocol) rpc.getProxy(ClientRMProtocol.class, rmAddress, appsManagerServerConf)); + return asm; + } + }); + */ + YarnConfiguration yarnConf = new YarnConfiguration(conf); + InetSocketAddress rmAddress = NetUtils.createSocketAddr(yarnConf.get( + YarnConfiguration.RM_ADDRESS, + YarnConfiguration.DEFAULT_RM_ADDRESS)); + LOG.info("Connecting to ResourceManager at " + rmAddress); + Configuration appsManagerServerConf = new Configuration(conf); + appsManagerServerConf.setClass(YarnConfiguration.YARN_SECURITY_INFO, + ClientRMSecurityInfo.class, SecurityInfo.class); + applicationsManager = ((ClientRMProtocol) rpc.getProxy(ClientRMProtocol.class, rmAddress, appsManagerServerConf)); + } + + /** + * Get a new application from the ASM + * @return New Application + * @throws YarnRemoteException + */ + private GetNewApplicationResponse getApplication() throws YarnRemoteException { + GetNewApplicationRequest request = Records.newRecord(GetNewApplicationRequest.class); + GetNewApplicationResponse response = applicationsManager.getNewApplication(request); + LOG.info("Got new application id=" + response.getApplicationId()); + return response; + } + + private static String getTestRuntimeClasspath() { + + InputStream classpathFileStream = null; + BufferedReader reader = null; + String envClassPath = ""; + + LOG.info("Trying to generate classpath for app master from current thread's classpath"); + try { + + // Create classpath from generated classpath + // Check maven ppom.xml for generated classpath info + // Works if compile time env is same as runtime. Mainly tests. + ClassLoader thisClassLoader = + Thread.currentThread().getContextClassLoader(); + String generatedClasspathFile = "yarn-apps-ds-generated-classpath"; + classpathFileStream = + thisClassLoader.getResourceAsStream(generatedClasspathFile); + if (classpathFileStream == null) { + LOG.info("Could not classpath resource from class loader"); + return envClassPath; + } + LOG.info("Readable bytes from stream=" + classpathFileStream.available()); + reader = new BufferedReader(new InputStreamReader(classpathFileStream)); + String cp = reader.readLine(); + if (cp != null) { + envClassPath += cp.trim() + ":"; + } + // Put the file itself on classpath for tasks. + envClassPath += thisClassLoader.getResource(generatedClasspathFile).getFile(); + } catch (IOException e) { + LOG.info("Could not find the necessary resource to generate class path for tests. Error=" + e.getMessage()); + } + + try { + if (classpathFileStream != null) { + classpathFileStream.close(); + } + if (reader != null) { + reader.close(); + } + } catch (IOException e) { + LOG.info("Failed to close class path file stream or reader. Error=" + e.getMessage()); + } + return envClassPath; + } + +} diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/DSConstants.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/DSConstants.java new file mode 100644 index 00000000000..b2fb81d0d9a --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/DSConstants.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.applications.distributedshell; + +/** + * Constants used in both Client and Application Master + */ +public class DSConstants { + + /** + * Environment key name pointing to the shell script's location + */ + public static final String DISTRIBUTEDSHELLSCRIPTLOCATION = "DISTRIBUTEDSHELLSCRIPTLOCATION"; + + /** + * Environment key name denoting the file timestamp for the shell script. + * Used to validate the local resource. + */ + public static final String DISTRIBUTEDSHELLSCRIPTTIMESTAMP = "DISTRIBUTEDSHELLSCRIPTTIMESTAMP"; + + /** + * Environment key name denoting the file content length for the shell script. + * Used to validate the local resource. + */ + public static final String DISTRIBUTEDSHELLSCRIPTLEN = "DISTRIBUTEDSHELLSCRIPTLEN"; +} diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java new file mode 100644 index 00000000000..d0407daa2b7 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.applications.distributedshell; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.server.MiniYARNCluster; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +public class TestDistributedShell { + + private static final Log LOG = + LogFactory.getLog(TestDistributedShell.class); + + protected static MiniYARNCluster yarnCluster = null; + protected static Configuration conf = new Configuration(); + + protected static String APPMASTER_JAR = "../hadoop-yarn-applications-distributedshell/target/hadoop-yarn-applications-distributedshell-0.24.0-SNAPSHOT.jar"; + + @BeforeClass + public static void setup() throws InterruptedException, IOException { + LOG.info("Starting up YARN cluster"); + if (yarnCluster == null) { + yarnCluster = new MiniYARNCluster(TestDistributedShell.class.getName()); + yarnCluster.init(conf); + yarnCluster.start(); + } + try { + Thread.sleep(2000); + } catch (InterruptedException e) { + LOG.info("setup thread sleep interrupted. message=" + e.getMessage()); + } + } + + @AfterClass + public static void tearDown() throws IOException { + if (yarnCluster != null) { + yarnCluster.stop(); + yarnCluster = null; + } + } + + @Test + public void testDSShell() throws Exception { + + String[] args = { + "--jar", + APPMASTER_JAR, + "--num_containers", + "2", + "--shell_command", + "ls", + "--master_memory", + "1536", + "--container_memory", + "1536" + }; + + LOG.info("Initializing DS Client"); + Client client = new Client(); + boolean initSuccess = client.init(args); + assert(initSuccess); + LOG.info("Running DS Client"); + boolean result = client.run(); + + LOG.info("Client run completed. Result=" + result); + assert (result == true); + + } + + +} + diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/pom.xml b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/pom.xml new file mode 100644 index 00000000000..713731004f6 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/pom.xml @@ -0,0 +1,30 @@ + + + + + hadoop-yarn + org.apache.hadoop + ${yarn.version} + + 4.0.0 + org.apache.hadoop + hadoop-yarn-applications + hadoop-yarn-applications + pom + + + hadoop-yarn-applications-distributedshell + + diff --git a/hadoop-mapreduce-project/hadoop-yarn/pom.xml b/hadoop-mapreduce-project/hadoop-yarn/pom.xml index aad5e4a1378..2b73cdb70e9 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/pom.xml +++ b/hadoop-mapreduce-project/hadoop-yarn/pom.xml @@ -424,5 +424,6 @@ hadoop-yarn-api hadoop-yarn-common hadoop-yarn-server + hadoop-yarn-applications