From 4e1d5a0d71d4bdde0d8b7b4c2a9571279496daaa Mon Sep 17 00:00:00 2001 From: Vinod Kumar Vavilapalli Date: Tue, 20 Dec 2011 23:13:15 +0000 Subject: [PATCH] MAPREDUCE-3391. Making a trivial change to correct a log message in DistributedShell app's AM. Contributed by Subroto Sanyal. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1221516 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 + .../distributedshell/ApplicationMaster.java | 134 +++++++++--------- 2 files changed, 70 insertions(+), 67 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 2cb12be1e57..b558442d8b5 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -161,6 +161,9 @@ Release 0.23.1 - Unreleased MAPREDUCE-3518. mapred queue -info -showJobs throws NPE. (Jonathan Eagles via mahadev) + MAPREDUCE-3391. Making a trivial change to correct a log message in + DistributedShell app's AM. (Subroto Sanyal via vinodkv) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java index ab371f51fe4..611bdf88678 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java @@ -184,7 +184,7 @@ public class ApplicationMaster { private CopyOnWriteArrayList releasedContainers = new CopyOnWriteArrayList(); // Launch threads - private List launchThreads = new ArrayList(); + private List launchThreads = new ArrayList(); /** * @param args Command line args @@ -194,7 +194,7 @@ public static void main(String[] args) { try { ApplicationMaster appMaster = new ApplicationMaster(); LOG.info("Initializing ApplicationMaster"); - boolean doRun = appMaster.init(args); + boolean doRun = appMaster.init(args); if (!doRun) { System.exit(0); } @@ -202,14 +202,14 @@ public static void main(String[] args) { } catch (Throwable t) { LOG.fatal("Error running ApplicationMaster", t); System.exit(1); - } + } if (result) { LOG.info("Application Master completed successfully. exiting"); System.exit(0); } else { LOG.info("Application Master failed. exiting"); - System.exit(2); + System.exit(2); } } @@ -218,7 +218,7 @@ public static void main(String[] args) { */ private void dumpOutDebugInfo() { - LOG.info("Dump debug output"); + LOG.info("Dump debug output"); Map envs = System.getenv(); for (Map.Entry env : envs.entrySet()) { LOG.info("System env: key=" + env.getKey() + ", val=" + env.getValue()); @@ -277,7 +277,7 @@ public boolean init(String[] args) throws ParseException, IOException { if (args.length == 0) { printUsage(opts); throw new IllegalArgumentException("No args specified for application master to initialize"); - } + } if (cliParser.hasOption("help")) { printUsage(opts); @@ -297,8 +297,8 @@ public boolean init(String[] args) throws ParseException, IOException { appAttemptID = ConverterUtils.toApplicationAttemptId(appIdStr); } else { - throw new IllegalArgumentException("Application Attempt Id not set in the environment"); - } + throw new IllegalArgumentException("Application Attempt Id not set in the environment"); + } } else { ContainerId containerId = ConverterUtils.toContainerId(envs.get(ApplicationConstants.AM_CONTAINER_ID_ENV)); appAttemptID = containerId.getApplicationAttemptId(); @@ -338,11 +338,11 @@ public boolean init(String[] args) throws ParseException, IOException { if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION)) { shellScriptPath = envs.get(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION); - if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP)) { - shellScriptPathTimestamp = Long.valueOf(envs.get(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP)); - } + if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP)) { + shellScriptPathTimestamp = Long.valueOf(envs.get(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP)); + } if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN)) { - shellScriptPathLen = Long.valueOf(envs.get(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN)); + shellScriptPathLen = Long.valueOf(envs.get(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN)); } if (!shellScriptPath.isEmpty() @@ -351,7 +351,7 @@ public boolean init(String[] args) throws ParseException, IOException { LOG.error("Illegal values in env for shell script path" + ", path=" + shellScriptPath + ", len=" + shellScriptPathLen - + ", timestamp=" + shellScriptPathTimestamp); + + ", timestamp=" + shellScriptPathTimestamp); throw new IllegalArgumentException("Illegal values in env for shell script path"); } } @@ -368,7 +368,7 @@ public boolean init(String[] args) throws ParseException, IOException { * @param opts Parsed command line options */ private void printUsage(Options opts) { - new HelpFormatter().printHelp("ApplicationMaster", opts); + new HelpFormatter().printHelp("ApplicationMaster", opts); } /** @@ -378,7 +378,7 @@ private void printUsage(Options opts) { public boolean run() throws YarnRemoteException { LOG.info("Starting ApplicationMaster"); - // Connect to ResourceManager + // Connect to ResourceManager resourceManager = connectToRM(); // Setup local RPC Server to accept status requests directly from clients @@ -395,7 +395,7 @@ public boolean run() throws YarnRemoteException { // A resource ask has to be atleast the minimum of the capability of the cluster, the value has to be // a multiple of the min value and cannot exceed the max. - // If it is not an exact multiple of min, the RM will allocate to the nearest multiple of min + // If it is not an exact multiple of min, the RM will allocate to the nearest multiple of min if (containerMemory < minMem) { LOG.info("Container memory specified below min threshold of cluster. Using min value." + ", specified=" + containerMemory @@ -409,14 +409,14 @@ else if (containerMemory > maxMem) { containerMemory = maxMem; } - // Setup heartbeat emitter + // Setup heartbeat emitter // TODO poll RM every now and then with an empty request to let RM know that we are alive // The heartbeat interval after which an AM is timed out by the RM is defined by a config setting: // RM_AM_EXPIRY_INTERVAL_MS with default defined by DEFAULT_RM_AM_EXPIRY_INTERVAL_MS // The allocate calls to the RM count as heartbeats so, for now, this additional heartbeat emitter // is not required. - // Setup ask for containers from RM + // Setup ask for containers from RM // Send request for containers to RM // Until we get our fully allocated quota, we keep on polling RM for containers // Keep looping until all the containers are launched and shell script executed on them @@ -426,7 +426,7 @@ else if (containerMemory > maxMem) { while (numCompletedContainers.get() < numTotalContainers && !appDone) { - loopCounter++; + loopCounter++; // log current state LOG.info("Current application state: loop=" + loopCounter @@ -435,7 +435,7 @@ else if (containerMemory > maxMem) { + ", requested=" + numRequestedContainers + ", completed=" + numCompletedContainers + ", failed=" + numFailedContainers - + ", currentAllocated=" + numAllocatedContainers); + + ", currentAllocated=" + numAllocatedContainers); // Sleep before each loop when asking RM for containers // to avoid flooding RM with spurious requests when it @@ -444,7 +444,7 @@ else if (containerMemory > maxMem) { try { Thread.sleep(1000); } catch (InterruptedException e) { - LOG.info("Sleep interrupted " + e.getMessage()); + LOG.info("Sleep interrupted " + e.getMessage()); } // No. of containers to request @@ -457,14 +457,14 @@ else if (containerMemory > maxMem) { // Setup request to be sent to RM to allocate containers List resourceReq = new ArrayList(); if (askCount > 0) { - ResourceRequest containerAsk = setupContainerAskForRM(askCount); + ResourceRequest containerAsk = setupContainerAskForRM(askCount); resourceReq.add(containerAsk); } // Send the request to RM LOG.info("Asking RM for containers" + ", askCount=" + askCount); - AMResponse amResp = sendContainerAskToRM(resourceReq); + AMResponse amResp =sendContainerAskToRM(resourceReq); // Retrieve list of allocated containers from the response List allocatedContainers = amResp.getAllocatedContainers(); @@ -478,10 +478,10 @@ else if (containerMemory > maxMem) { + ", containerNodeURI=" + allocatedContainer.getNodeHttpAddress() + ", containerState" + allocatedContainer.getState() + ", containerResourceMemory" + allocatedContainer.getResource().getMemory()); - // + ", containerToken" + allocatedContainer.getContainerToken().getIdentifier().toString()); + //+ ", containerToken" + allocatedContainer.getContainerToken().getIdentifier().toString()); LaunchContainerRunnable runnableLaunchContainer = new LaunchContainerRunnable(allocatedContainer); - Thread launchThread = new Thread(runnableLaunchContainer); + Thread launchThread = new Thread(runnableLaunchContainer); // launch and start the container on a separate thread to keep the main thread unblocked // as all containers may not be allocated at one go. @@ -492,14 +492,14 @@ else if (containerMemory > maxMem) { // Check what the current available resources in the cluster are // TODO should we do anything if the available resources are not enough? Resource availableResources = amResp.getAvailableResources(); - LOG.info("Current available resources in the cluster " + availableResources); + LOG.info("Current available resources in the cluster " + availableResources); - // Check the completed containers + // Check the completed containers List completedContainers = amResp.getCompletedContainersStatuses(); LOG.info("Got response from RM for container ask, completedCnt=" + completedContainers.size()); - for (ContainerStatus containerStatus : completedContainers) { + for (ContainerStatus containerStatus : completedContainers) { LOG.info("Got container status for containerID= " + containerStatus.getContainerId() - + ", state=" + containerStatus.getState() + + ", state=" + containerStatus.getState() + ", exitStatus=" + containerStatus.getExitStatus() + ", diagnostics=" + containerStatus.getDiagnostics()); @@ -514,7 +514,7 @@ else if (containerMemory > maxMem) { // shell script failed // counts as completed numCompletedContainers.incrementAndGet(); - numFailedContainers.incrementAndGet(); + numFailedContainers.incrementAndGet(); } else { // something else bad happened @@ -541,15 +541,15 @@ else if (containerMemory > maxMem) { LOG.info("Current application state: loop=" + loopCounter + ", appDone=" + appDone - + ", total=" + numTotalContainers + + ", total=" + numTotalContainers + ", requested=" + numRequestedContainers + ", completed=" + numCompletedContainers + ", failed=" + numFailedContainers - + ", currentAllocated=" + numAllocatedContainers); + + ", currentAllocated=" + numAllocatedContainers); // TODO // Add a timeout handling layer - // for misbehaving shell commands + // for misbehaving shell commands } // Join all launched threads @@ -561,7 +561,7 @@ else if (containerMemory > maxMem) { } catch (InterruptedException e) { LOG.info("Exception thrown in thread join: " + e.getMessage()); e.printStackTrace(); - } + } } // When the application completes, it should send a finish application signal @@ -610,10 +610,11 @@ public LaunchContainerRunnable(Container lcontainer) { * Helper function to connect to CM */ private void connectToCM() { - String cmIpPortStr = container.getNodeId().getHost() + ":" - + container.getNodeId().getPort(); - InetSocketAddress cmAddress = NetUtils.createSocketAddr(cmIpPortStr); - LOG.info("Connecting to ResourceManager at " + cmIpPortStr); + LOG.debug("Connecting to ContainerManager for containerid=" + container.getId()); + String cmIpPortStr = container.getNodeId().getHost() + ":" + + container.getNodeId().getPort(); + InetSocketAddress cmAddress = NetUtils.createSocketAddr(cmIpPortStr); + LOG.info("Connecting to ContainerManager at " + cmIpPortStr); this.cm = ((ContainerManager) rpc.getProxy(ContainerManager.class, cmAddress, conf)); } @@ -626,7 +627,6 @@ private void connectToCM() { */ public void run() { // Connect to ContainerManager - LOG.info("Connecting to container manager for containerid=" + container.getId()); connectToCM(); LOG.info("Setting up container launch container for containerid=" + container.getId()); @@ -654,7 +654,7 @@ public void run() { if (!shellScriptPath.isEmpty()) { LocalResource shellRsrc = Records.newRecord(LocalResource.class); shellRsrc.setType(LocalResourceType.FILE); - shellRsrc.setVisibility(LocalResourceVisibility.APPLICATION); + shellRsrc.setVisibility(LocalResourceVisibility.APPLICATION); try { shellRsrc.setResource(ConverterUtils.getYarnUrlFromURI(new URI(shellScriptPath))); } catch (URISyntaxException e) { @@ -664,17 +664,17 @@ public void run() { // A failure scenario on bad input such as invalid shell script path // We know we cannot continue launching the container - // so we should release it. + // so we should release it. // TODO numCompletedContainers.incrementAndGet(); numFailedContainers.incrementAndGet(); - return; + return; } shellRsrc.setTimestamp(shellScriptPathTimestamp); shellRsrc.setSize(shellScriptPathLen); localResources.put(ExecShellStringPath, shellRsrc); - } - ctx.setLocalResources(localResources); + } + ctx.setLocalResources(localResources); // Set the necessary command to execute on the allocated container Vector vargs = new Vector(5); @@ -686,7 +686,7 @@ public void run() { vargs.add(ExecShellStringPath); } - // Set args for the shell command if any + // Set args for the shell command if any vargs.add(shellArgs); // Add log redirect params // TODO @@ -722,19 +722,19 @@ public void run() { // Left commented out as the shell scripts are short lived // and we are relying on the status for completed containers from RM to detect status - // GetContainerStatusRequest statusReq = Records.newRecord(GetContainerStatusRequest.class); - // statusReq.setContainerId(container.getId()); - // GetContainerStatusResponse statusResp; - // try { - // statusResp = cm.getContainerStatus(statusReq); - // LOG.info("Container Status" - // + ", id=" + container.getId() - // + ", status=" +statusResp.getStatus()); - // } catch (YarnRemoteException e) { - // e.printStackTrace(); - // } - } - } + // GetContainerStatusRequest statusReq = Records.newRecord(GetContainerStatusRequest.class); + // statusReq.setContainerId(container.getId()); + // GetContainerStatusResponse statusResp; + //try { + //statusResp = cm.getContainerStatus(statusReq); + // LOG.info("Container Status" + // + ", id=" + container.getId() + // + ", status=" +statusResp.getStatus()); + //} catch (YarnRemoteException e) { + //e.printStackTrace(); + //} + } + } /** * Connect to the Resource Manager @@ -744,25 +744,25 @@ private AMRMProtocol connectToRM() { YarnConfiguration yarnConf = new YarnConfiguration(conf); InetSocketAddress rmAddress = NetUtils.createSocketAddr(yarnConf.get( YarnConfiguration.RM_SCHEDULER_ADDRESS, - YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS)); + YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS)); LOG.info("Connecting to ResourceManager at " + rmAddress); return ((AMRMProtocol) rpc.getProxy(AMRMProtocol.class, rmAddress, conf)); - } + } /** * Register the Application Master to the Resource Manager * @return the registration response from the RM * @throws YarnRemoteException */ - private RegisterApplicationMasterResponse registerToRM() throws YarnRemoteException { - RegisterApplicationMasterRequest appMasterRequest = Records.newRecord(RegisterApplicationMasterRequest.class); + private RegisterApplicationMasterResponse registerToRM() throws YarnRemoteException { + RegisterApplicationMasterRequest appMasterRequest = Records.newRecord(RegisterApplicationMasterRequest.class); // set the required info into the registration request: // application attempt id, // host on which the app master is running // rpc port on which the app master accepts requests from the client // tracking url for the app master - appMasterRequest.setApplicationAttemptId(appAttemptID); + appMasterRequest.setApplicationAttemptId(appAttemptID); appMasterRequest.setHost(appMasterHostname); appMasterRequest.setRpcPort(appMasterRpcPort); appMasterRequest.setTrackingUrl(appMasterTrackingUrl); @@ -792,7 +792,7 @@ private ResourceRequest setupContainerAskForRM(int numContainers) { Priority pri = Records.newRecord(Priority.class); // TODO - what is the range for priority? how to decide? pri.setPriority(requestPriority); - request.setPriority(pri); + request.setPriority(pri); // Set up resource type requirements // For now, only memory is supported so we set memory requirements @@ -810,7 +810,7 @@ private ResourceRequest setupContainerAskForRM(int numContainers) { * @throws YarnRemoteException */ private AMResponse sendContainerAskToRM(List requestedContainers) - throws YarnRemoteException { + throws YarnRemoteException { AllocateRequest req = Records.newRecord(AllocateRequest.class); req.setResponseId(rmRequestID.incrementAndGet()); req.setApplicationAttemptId(appAttemptID); @@ -830,7 +830,7 @@ private AMResponse sendContainerAskToRM(List requestedContainer LOG.info("Released container, id=" + id.getId()); } - AllocateResponse resp = resourceManager.allocate(req); - return resp.getAMResponse(); + AllocateResponse resp = resourceManager.allocate(req); + return resp.getAMResponse(); } }