merge -c 1437156 from trunk to branch-2 to fix YARN-277. Use AMRMClient in DistributedShell to exemplify the approach. Contriubed by Bikas Saha
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1437159 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
ebf72a71ba
commit
60230692dc
|
@ -84,6 +84,9 @@ Release 2.0.3-alpha - Unreleased
|
|||
|
||||
YARN-331. Fill in missing fair scheduler documentation. (sandyr via tucu)
|
||||
|
||||
YARN-277. Use AMRMClient in DistributedShell to exemplify the approach.
|
||||
(Bikas Saha via hitesh)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
BUG FIXES
|
||||
|
|
|
@ -29,7 +29,6 @@ import java.util.HashMap;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Vector;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.apache.commons.cli.CommandLine;
|
||||
|
@ -51,9 +50,6 @@ import org.apache.hadoop.yarn.api.ContainerManager;
|
|||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
|
||||
//import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
|
||||
//import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
|
||||
|
||||
|
@ -71,6 +67,9 @@ import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
|
|||
import org.apache.hadoop.yarn.api.records.Priority;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
import org.apache.hadoop.yarn.client.AMRMClient;
|
||||
import org.apache.hadoop.yarn.client.AMRMClient.ContainerRequest;
|
||||
import org.apache.hadoop.yarn.client.AMRMClientImpl;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
|
||||
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
||||
|
@ -78,37 +77,64 @@ import org.apache.hadoop.yarn.util.ConverterUtils;
|
|||
import org.apache.hadoop.yarn.util.Records;
|
||||
|
||||
/**
|
||||
* An ApplicationMaster for executing shell commands on a set of launched containers using the YARN framework.
|
||||
* An ApplicationMaster for executing shell commands on a set of launched
|
||||
* containers using the YARN framework.
|
||||
*
|
||||
* <p>This class is meant to act as an example on how to write yarn-based application masters. </p>
|
||||
* <p>
|
||||
* This class is meant to act as an example on how to write yarn-based
|
||||
* application masters.
|
||||
* </p>
|
||||
*
|
||||
* <p> The ApplicationMaster is started on a container by the <code>ResourceManager</code>'s launcher.
|
||||
* The first thing that the <code>ApplicationMaster</code> needs to do is to connect and register itself with
|
||||
* the <code>ResourceManager</code>. The registration sets up information within the <code>ResourceManager</code>
|
||||
* regarding what host:port the ApplicationMaster is listening on to provide any form of functionality to a client
|
||||
* as well as a tracking url that a client can use to keep track of status/job history if needed. </p>
|
||||
* <p>
|
||||
* The ApplicationMaster is started on a container by the
|
||||
* <code>ResourceManager</code>'s launcher. The first thing that the
|
||||
* <code>ApplicationMaster</code> needs to do is to connect and register itself
|
||||
* with the <code>ResourceManager</code>. The registration sets up information
|
||||
* within the <code>ResourceManager</code> regarding what host:port the
|
||||
* ApplicationMaster is listening on to provide any form of functionality to a
|
||||
* client as well as a tracking url that a client can use to keep track of
|
||||
* status/job history if needed.
|
||||
* </p>
|
||||
*
|
||||
* <p> The <code>ApplicationMaster</code> needs to send a heartbeat to the <code>ResourceManager</code> at regular intervals
|
||||
* to inform the <code>ResourceManager</code> that it is up and alive. The {@link AMRMProtocol#allocate} to the
|
||||
* <code>ResourceManager</code> from the <code>ApplicationMaster</code> acts as a heartbeat.
|
||||
* <p>
|
||||
* The <code>ApplicationMaster</code> needs to send a heartbeat to the
|
||||
* <code>ResourceManager</code> at regular intervals to inform the
|
||||
* <code>ResourceManager</code> that it is up and alive. The
|
||||
* {@link AMRMProtocol#allocate} to the <code>ResourceManager</code> from the
|
||||
* <code>ApplicationMaster</code> acts as a heartbeat.
|
||||
*
|
||||
* <p> For the actual handling of the job, the <code>ApplicationMaster</code> has to request the
|
||||
* <code>ResourceManager</code> via {@link AllocateRequest} for the required no. of containers using {@link ResourceRequest}
|
||||
* with the necessary resource specifications such as node location, computational (memory/disk/cpu) resource requirements.
|
||||
* The <code>ResourceManager</code> responds with an {@link AllocateResponse} that informs the <code>ApplicationMaster</code>
|
||||
* of the set of newly allocated containers, completed containers as well as current state of available resources. </p>
|
||||
* <p>
|
||||
* For the actual handling of the job, the <code>ApplicationMaster</code> has to
|
||||
* request the <code>ResourceManager</code> via {@link AllocateRequest} for the
|
||||
* required no. of containers using {@link ResourceRequest} with the necessary
|
||||
* resource specifications such as node location, computational
|
||||
* (memory/disk/cpu) resource requirements. The <code>ResourceManager</code>
|
||||
* responds with an {@link AllocateResponse} that informs the
|
||||
* <code>ApplicationMaster</code> of the set of newly allocated containers,
|
||||
* completed containers as well as current state of available resources.
|
||||
* </p>
|
||||
*
|
||||
* <p> For each allocated container, the <code>ApplicationMaster</code> can then set up the necessary launch context via
|
||||
* {@link ContainerLaunchContext} to specify the allocated container id, local resources required by the executable,
|
||||
* the environment to be setup for the executable, commands to execute, etc. and submit a {@link StartContainerRequest}
|
||||
* to the {@link ContainerManager} to launch and execute the defined commands on the given allocated container. </p>
|
||||
* <p>
|
||||
* For each allocated container, the <code>ApplicationMaster</code> can then set
|
||||
* up the necessary launch context via {@link ContainerLaunchContext} to specify
|
||||
* the allocated container id, local resources required by the executable, the
|
||||
* environment to be setup for the executable, commands to execute, etc. and
|
||||
* submit a {@link StartContainerRequest} to the {@link ContainerManager} to
|
||||
* launch and execute the defined commands on the given allocated container.
|
||||
* </p>
|
||||
*
|
||||
* <p> The <code>ApplicationMaster</code> can monitor the launched container by either querying the <code>ResourceManager</code>
|
||||
* using {@link AMRMProtocol#allocate} to get updates on completed containers or via the {@link ContainerManager}
|
||||
* by querying for the status of the allocated container's {@link ContainerId}.
|
||||
* <p>
|
||||
* The <code>ApplicationMaster</code> can monitor the launched container by
|
||||
* either querying the <code>ResourceManager</code> using
|
||||
* {@link AMRMProtocol#allocate} to get updates on completed containers or via
|
||||
* the {@link ContainerManager} by querying for the status of the allocated
|
||||
* container's {@link ContainerId}.
|
||||
*
|
||||
* <p> After the job has been completed, the <code>ApplicationMaster</code> has to send a {@link FinishApplicationMasterRequest}
|
||||
* to the <code>ResourceManager</code> to inform it that the <code>ApplicationMaster</code> has been completed.
|
||||
* <p>
|
||||
* After the job has been completed, the <code>ApplicationMaster</code> has to
|
||||
* send a {@link FinishApplicationMasterRequest} to the
|
||||
* <code>ResourceManager</code> to inform it that the
|
||||
* <code>ApplicationMaster</code> has been completed.
|
||||
*/
|
||||
@InterfaceAudience.Public
|
||||
@InterfaceStability.Unstable
|
||||
|
@ -122,7 +148,7 @@ public class ApplicationMaster {
|
|||
private YarnRPC rpc;
|
||||
|
||||
// Handle to communicate with the Resource Manager
|
||||
private AMRMProtocol resourceManager;
|
||||
private AMRMClient resourceManager;
|
||||
|
||||
// Application Attempt Id ( combination of attemptId and fail count )
|
||||
private ApplicationAttemptId appAttemptID;
|
||||
|
@ -131,7 +157,7 @@ public class ApplicationMaster {
|
|||
// For status update for clients - yet to be implemented
|
||||
// Hostname of the container
|
||||
private String appMasterHostname = "";
|
||||
// Port on which the app master listens for status update requests from clients
|
||||
// Port on which the app master listens for status updates from clients
|
||||
private int appMasterRpcPort = 0;
|
||||
// Tracking url to which app master publishes info for clients to monitor
|
||||
private String appMasterTrackingUrl = "";
|
||||
|
@ -144,9 +170,6 @@ public class ApplicationMaster {
|
|||
// Priority of the request
|
||||
private int requestPriority;
|
||||
|
||||
// Incremental counter for rpc calls to the RM
|
||||
private AtomicInteger rmRequestID = new AtomicInteger();
|
||||
|
||||
// Simple flag to denote whether all works is done
|
||||
private boolean appDone = false;
|
||||
// Counter for completed containers ( complete denotes successful or failed )
|
||||
|
@ -157,7 +180,7 @@ public class ApplicationMaster {
|
|||
// Count of failed containers
|
||||
private AtomicInteger numFailedContainers = new AtomicInteger();
|
||||
// Count of containers already requested from the RM
|
||||
// Needed as once requested, we should not request for containers again and again.
|
||||
// Needed as once requested, we should not request for containers again.
|
||||
// Only request for more if the original requirement changes.
|
||||
private AtomicInteger numRequestedContainers = new AtomicInteger();
|
||||
|
||||
|
@ -179,9 +202,6 @@ public class ApplicationMaster {
|
|||
// Hardcoded path to shell script in launch container's local env
|
||||
private final String ExecShellStringPath = "ExecShellScript.sh";
|
||||
|
||||
// Containers to be released
|
||||
private CopyOnWriteArrayList<ContainerId> releasedContainers = new CopyOnWriteArrayList<ContainerId>();
|
||||
|
||||
// Launch threads
|
||||
private List<Thread> launchThreads = new ArrayList<Thread>();
|
||||
|
||||
|
@ -205,8 +225,7 @@ public class ApplicationMaster {
|
|||
if (result) {
|
||||
LOG.info("Application Master completed successfully. exiting");
|
||||
System.exit(0);
|
||||
}
|
||||
else {
|
||||
} else {
|
||||
LOG.info("Application Master failed. exiting");
|
||||
System.exit(2);
|
||||
}
|
||||
|
@ -221,7 +240,8 @@ public class ApplicationMaster {
|
|||
Map<String, String> envs = System.getenv();
|
||||
for (Map.Entry<String, String> env : envs.entrySet()) {
|
||||
LOG.info("System env: key=" + env.getKey() + ", val=" + env.getValue());
|
||||
System.out.println("System env: key=" + env.getKey() + ", val=" + env.getValue());
|
||||
System.out.println("System env: key=" + env.getKey() + ", val="
|
||||
+ env.getValue());
|
||||
}
|
||||
|
||||
String cmd = "ls -al";
|
||||
|
@ -231,9 +251,10 @@ public class ApplicationMaster {
|
|||
pr = run.exec(cmd);
|
||||
pr.waitFor();
|
||||
|
||||
BufferedReader buf = new BufferedReader(new InputStreamReader(pr.getInputStream()));
|
||||
BufferedReader buf = new BufferedReader(new InputStreamReader(
|
||||
pr.getInputStream()));
|
||||
String line = "";
|
||||
while ((line=buf.readLine())!=null) {
|
||||
while ((line = buf.readLine()) != null) {
|
||||
LOG.info("System CWD content: " + line);
|
||||
System.out.println("System CWD content: " + line);
|
||||
}
|
||||
|
@ -247,11 +268,13 @@ public class ApplicationMaster {
|
|||
|
||||
public ApplicationMaster() throws Exception {
|
||||
// Set up the configuration and RPC
|
||||
conf = new Configuration();
|
||||
conf = new YarnConfiguration();
|
||||
rpc = YarnRPC.create(conf);
|
||||
}
|
||||
|
||||
/**
|
||||
* Parse command line options
|
||||
*
|
||||
* @param args Command line args
|
||||
* @return Whether init successful and run should be invoked
|
||||
* @throws ParseException
|
||||
|
@ -260,13 +283,19 @@ public class ApplicationMaster {
|
|||
public boolean init(String[] args) throws ParseException, IOException {
|
||||
|
||||
Options opts = new Options();
|
||||
opts.addOption("app_attempt_id", true, "App Attempt ID. Not to be used unless for testing purposes");
|
||||
opts.addOption("shell_command", true, "Shell command to be executed by the Application Master");
|
||||
opts.addOption("shell_script", true, "Location of the shell script to be executed");
|
||||
opts.addOption("app_attempt_id", true,
|
||||
"App Attempt ID. Not to be used unless for testing purposes");
|
||||
opts.addOption("shell_command", true,
|
||||
"Shell command to be executed by the Application Master");
|
||||
opts.addOption("shell_script", true,
|
||||
"Location of the shell script to be executed");
|
||||
opts.addOption("shell_args", true, "Command line args for the shell script");
|
||||
opts.addOption("shell_env", true, "Environment for shell script. Specified as env_key=env_val pairs");
|
||||
opts.addOption("container_memory", true, "Amount of memory in MB to be requested to run the shell command");
|
||||
opts.addOption("num_containers", true, "No. of containers on which the shell command needs to be executed");
|
||||
opts.addOption("shell_env", true,
|
||||
"Environment for shell script. Specified as env_key=env_val pairs");
|
||||
opts.addOption("container_memory", true,
|
||||
"Amount of memory in MB to be requested to run the shell command");
|
||||
opts.addOption("num_containers", true,
|
||||
"No. of containers on which the shell command needs to be executed");
|
||||
opts.addOption("priority", true, "Application Priority. Default 0");
|
||||
opts.addOption("debug", false, "Dump out debug information");
|
||||
|
||||
|
@ -275,7 +304,8 @@ public class ApplicationMaster {
|
|||
|
||||
if (args.length == 0) {
|
||||
printUsage(opts);
|
||||
throw new IllegalArgumentException("No args specified for application master to initialize");
|
||||
throw new IllegalArgumentException(
|
||||
"No args specified for application master to initialize");
|
||||
}
|
||||
|
||||
if (cliParser.hasOption("help")) {
|
||||
|
@ -289,7 +319,6 @@ public class ApplicationMaster {
|
|||
|
||||
Map<String, String> envs = System.getenv();
|
||||
|
||||
appAttemptID = Records.newRecord(ApplicationAttemptId.class);
|
||||
if (envs.containsKey(ApplicationConstants.AM_APP_ATTEMPT_ID_ENV)) {
|
||||
appAttemptID = ConverterUtils.toApplicationAttemptId(envs
|
||||
.get(ApplicationConstants.AM_APP_ATTEMPT_ID_ENV));
|
||||
|
@ -297,22 +326,24 @@ public class ApplicationMaster {
|
|||
if (cliParser.hasOption("app_attempt_id")) {
|
||||
String appIdStr = cliParser.getOptionValue("app_attempt_id", "");
|
||||
appAttemptID = ConverterUtils.toApplicationAttemptId(appIdStr);
|
||||
}
|
||||
else {
|
||||
throw new IllegalArgumentException("Application Attempt Id not set in the environment");
|
||||
} else {
|
||||
throw new IllegalArgumentException(
|
||||
"Application Attempt Id not set in the environment");
|
||||
}
|
||||
} else {
|
||||
ContainerId containerId = ConverterUtils.toContainerId(envs.get(ApplicationConstants.AM_CONTAINER_ID_ENV));
|
||||
ContainerId containerId = ConverterUtils.toContainerId(envs
|
||||
.get(ApplicationConstants.AM_CONTAINER_ID_ENV));
|
||||
appAttemptID = containerId.getApplicationAttemptId();
|
||||
}
|
||||
|
||||
LOG.info("Application master for app"
|
||||
+ ", appId=" + appAttemptID.getApplicationId().getId()
|
||||
+ ", clustertimestamp=" + appAttemptID.getApplicationId().getClusterTimestamp()
|
||||
LOG.info("Application master for app" + ", appId="
|
||||
+ appAttemptID.getApplicationId().getId() + ", clustertimestamp="
|
||||
+ appAttemptID.getApplicationId().getClusterTimestamp()
|
||||
+ ", attemptId=" + appAttemptID.getAttemptId());
|
||||
|
||||
if (!cliParser.hasOption("shell_command")) {
|
||||
throw new IllegalArgumentException("No shell command specified to be executed by application master");
|
||||
throw new IllegalArgumentException(
|
||||
"No shell command specified to be executed by application master");
|
||||
}
|
||||
shellCommand = cliParser.getOptionValue("shell_command");
|
||||
|
||||
|
@ -330,8 +361,8 @@ public class ApplicationMaster {
|
|||
}
|
||||
String key = env.substring(0, index);
|
||||
String val = "";
|
||||
if (index < (env.length()-1)) {
|
||||
val = env.substring(index+1);
|
||||
if (index < (env.length() - 1)) {
|
||||
val = env.substring(index + 1);
|
||||
}
|
||||
shellEnv.put(key, val);
|
||||
}
|
||||
|
@ -341,32 +372,37 @@ public class ApplicationMaster {
|
|||
shellScriptPath = envs.get(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION);
|
||||
|
||||
if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP)) {
|
||||
shellScriptPathTimestamp = Long.valueOf(envs.get(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP));
|
||||
shellScriptPathTimestamp = Long.valueOf(envs
|
||||
.get(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP));
|
||||
}
|
||||
if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN)) {
|
||||
shellScriptPathLen = Long.valueOf(envs.get(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN));
|
||||
shellScriptPathLen = Long.valueOf(envs
|
||||
.get(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN));
|
||||
}
|
||||
|
||||
if (!shellScriptPath.isEmpty()
|
||||
&& (shellScriptPathTimestamp <= 0
|
||||
|| shellScriptPathLen <= 0)) {
|
||||
LOG.error("Illegal values in env for shell script path"
|
||||
+ ", path=" + shellScriptPath
|
||||
+ ", len=" + shellScriptPathLen
|
||||
+ ", timestamp=" + shellScriptPathTimestamp);
|
||||
throw new IllegalArgumentException("Illegal values in env for shell script path");
|
||||
&& (shellScriptPathTimestamp <= 0 || shellScriptPathLen <= 0)) {
|
||||
LOG.error("Illegal values in env for shell script path" + ", path="
|
||||
+ shellScriptPath + ", len=" + shellScriptPathLen + ", timestamp="
|
||||
+ shellScriptPathTimestamp);
|
||||
throw new IllegalArgumentException(
|
||||
"Illegal values in env for shell script path");
|
||||
}
|
||||
}
|
||||
|
||||
containerMemory = Integer.parseInt(cliParser.getOptionValue("container_memory", "10"));
|
||||
numTotalContainers = Integer.parseInt(cliParser.getOptionValue("num_containers", "1"));
|
||||
requestPriority = Integer.parseInt(cliParser.getOptionValue("priority", "0"));
|
||||
containerMemory = Integer.parseInt(cliParser.getOptionValue(
|
||||
"container_memory", "10"));
|
||||
numTotalContainers = Integer.parseInt(cliParser.getOptionValue(
|
||||
"num_containers", "1"));
|
||||
requestPriority = Integer.parseInt(cliParser
|
||||
.getOptionValue("priority", "0"));
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper function to print usage
|
||||
*
|
||||
* @param opts Parsed command line options
|
||||
*/
|
||||
private void printUsage(Options opts) {
|
||||
|
@ -375,224 +411,236 @@ public class ApplicationMaster {
|
|||
|
||||
/**
|
||||
* Main run function for the application master
|
||||
*
|
||||
* @throws YarnRemoteException
|
||||
*/
|
||||
public boolean run() throws YarnRemoteException {
|
||||
LOG.info("Starting ApplicationMaster");
|
||||
|
||||
// Connect to ResourceManager
|
||||
resourceManager = connectToRM();
|
||||
resourceManager = new AMRMClientImpl(appAttemptID);
|
||||
resourceManager.init(conf);
|
||||
resourceManager.start();
|
||||
|
||||
// Setup local RPC Server to accept status requests directly from clients
|
||||
// TODO need to setup a protocol for client to be able to communicate to the RPC server
|
||||
// TODO use the rpc port info to register with the RM for the client to send requests to this app master
|
||||
try {
|
||||
// Setup local RPC Server to accept status requests directly from clients
|
||||
// TODO need to setup a protocol for client to be able to communicate to
|
||||
// the RPC server
|
||||
// TODO use the rpc port info to register with the RM for the client to
|
||||
// send requests to this app master
|
||||
|
||||
// Register self with ResourceManager
|
||||
RegisterApplicationMasterResponse response = registerToRM();
|
||||
// Dump out information about cluster capability as seen by the resource manager
|
||||
int minMem = response.getMinimumResourceCapability().getMemory();
|
||||
int maxMem = response.getMaximumResourceCapability().getMemory();
|
||||
LOG.info("Min mem capabililty of resources in this cluster " + minMem);
|
||||
LOG.info("Max mem capabililty of resources in this cluster " + maxMem);
|
||||
// Register self with ResourceManager
|
||||
RegisterApplicationMasterResponse response = resourceManager
|
||||
.registerApplicationMaster(appMasterHostname, appMasterRpcPort,
|
||||
appMasterTrackingUrl);
|
||||
// Dump out information about cluster capability as seen by the
|
||||
// resource manager
|
||||
int minMem = response.getMinimumResourceCapability().getMemory();
|
||||
int maxMem = response.getMaximumResourceCapability().getMemory();
|
||||
LOG.info("Min mem capabililty of resources in this cluster " + minMem);
|
||||
LOG.info("Max mem capabililty of resources in this cluster " + maxMem);
|
||||
|
||||
// A resource ask has to be atleast the minimum of the capability of the cluster, the value has to be
|
||||
// a multiple of the min value and cannot exceed the max.
|
||||
// If it is not an exact multiple of min, the RM will allocate to the nearest multiple of min
|
||||
if (containerMemory < minMem) {
|
||||
LOG.info("Container memory specified below min threshold of cluster. Using min value."
|
||||
+ ", specified=" + containerMemory
|
||||
+ ", min=" + minMem);
|
||||
containerMemory = minMem;
|
||||
}
|
||||
else if (containerMemory > maxMem) {
|
||||
LOG.info("Container memory specified above max threshold of cluster. Using max value."
|
||||
+ ", specified=" + containerMemory
|
||||
+ ", max=" + maxMem);
|
||||
containerMemory = maxMem;
|
||||
}
|
||||
|
||||
// Setup heartbeat emitter
|
||||
// TODO poll RM every now and then with an empty request to let RM know that we are alive
|
||||
// The heartbeat interval after which an AM is timed out by the RM is defined by a config setting:
|
||||
// RM_AM_EXPIRY_INTERVAL_MS with default defined by DEFAULT_RM_AM_EXPIRY_INTERVAL_MS
|
||||
// The allocate calls to the RM count as heartbeats so, for now, this additional heartbeat emitter
|
||||
// is not required.
|
||||
|
||||
// Setup ask for containers from RM
|
||||
// Send request for containers to RM
|
||||
// Until we get our fully allocated quota, we keep on polling RM for containers
|
||||
// Keep looping until all the containers are launched and shell script executed on them
|
||||
// ( regardless of success/failure).
|
||||
|
||||
int loopCounter = -1;
|
||||
|
||||
while (numCompletedContainers.get() < numTotalContainers
|
||||
&& !appDone) {
|
||||
loopCounter++;
|
||||
|
||||
// log current state
|
||||
LOG.info("Current application state: loop=" + loopCounter
|
||||
+ ", appDone=" + appDone
|
||||
+ ", total=" + numTotalContainers
|
||||
+ ", requested=" + numRequestedContainers
|
||||
+ ", completed=" + numCompletedContainers
|
||||
+ ", failed=" + numFailedContainers
|
||||
+ ", currentAllocated=" + numAllocatedContainers);
|
||||
|
||||
// Sleep before each loop when asking RM for containers
|
||||
// to avoid flooding RM with spurious requests when it
|
||||
// need not have any available containers
|
||||
// Sleeping for 1000 ms.
|
||||
try {
|
||||
Thread.sleep(1000);
|
||||
} catch (InterruptedException e) {
|
||||
LOG.info("Sleep interrupted " + e.getMessage());
|
||||
// A resource ask has to be atleast the minimum of the capability of the
|
||||
// cluster, the value has to be a multiple of the min value and cannot
|
||||
// exceed the max.
|
||||
// If it is not an exact multiple of min, the RM will allocate to the
|
||||
// nearest multiple of min
|
||||
if (containerMemory < minMem) {
|
||||
LOG.info("Container memory specified below min threshold of cluster."
|
||||
+ " Using min value." + ", specified=" + containerMemory + ", min="
|
||||
+ minMem);
|
||||
containerMemory = minMem;
|
||||
} else if (containerMemory > maxMem) {
|
||||
LOG.info("Container memory specified above max threshold of cluster."
|
||||
+ " Using max value." + ", specified=" + containerMemory + ", max="
|
||||
+ maxMem);
|
||||
containerMemory = maxMem;
|
||||
}
|
||||
|
||||
// No. of containers to request
|
||||
// For the first loop, askCount will be equal to total containers needed
|
||||
// From that point on, askCount will always be 0 as current implementation
|
||||
// does not change its ask on container failures.
|
||||
int askCount = numTotalContainers - numRequestedContainers.get();
|
||||
numRequestedContainers.addAndGet(askCount);
|
||||
// Setup heartbeat emitter
|
||||
// TODO poll RM every now and then with an empty request to let RM know
|
||||
// that we are alive
|
||||
// The heartbeat interval after which an AM is timed out by the RM is
|
||||
// defined by a config setting:
|
||||
// RM_AM_EXPIRY_INTERVAL_MS with default defined by
|
||||
// DEFAULT_RM_AM_EXPIRY_INTERVAL_MS
|
||||
// The allocate calls to the RM count as heartbeats so, for now,
|
||||
// this additional heartbeat emitter is not required.
|
||||
|
||||
// Setup request to be sent to RM to allocate containers
|
||||
List<ResourceRequest> resourceReq = new ArrayList<ResourceRequest>();
|
||||
if (askCount > 0) {
|
||||
ResourceRequest containerAsk = setupContainerAskForRM(askCount);
|
||||
resourceReq.add(containerAsk);
|
||||
}
|
||||
// Setup ask for containers from RM
|
||||
// Send request for containers to RM
|
||||
// Until we get our fully allocated quota, we keep on polling RM for
|
||||
// containers
|
||||
// Keep looping until all the containers are launched and shell script
|
||||
// executed on them ( regardless of success/failure).
|
||||
|
||||
// Send the request to RM
|
||||
LOG.info("Asking RM for containers"
|
||||
+ ", askCount=" + askCount);
|
||||
AMResponse amResp =sendContainerAskToRM(resourceReq);
|
||||
int loopCounter = -1;
|
||||
|
||||
// Retrieve list of allocated containers from the response
|
||||
List<Container> allocatedContainers = amResp.getAllocatedContainers();
|
||||
LOG.info("Got response from RM for container ask, allocatedCnt=" + allocatedContainers.size());
|
||||
numAllocatedContainers.addAndGet(allocatedContainers.size());
|
||||
for (Container allocatedContainer : allocatedContainers) {
|
||||
LOG.info("Launching shell command on a new container."
|
||||
+ ", containerId=" + allocatedContainer.getId()
|
||||
+ ", containerNode=" + allocatedContainer.getNodeId().getHost()
|
||||
+ ":" + allocatedContainer.getNodeId().getPort()
|
||||
+ ", containerNodeURI=" + allocatedContainer.getNodeHttpAddress()
|
||||
+ ", containerState" + allocatedContainer.getState()
|
||||
+ ", containerResourceMemory" + allocatedContainer.getResource().getMemory());
|
||||
//+ ", containerToken" + allocatedContainer.getContainerToken().getIdentifier().toString());
|
||||
while (numCompletedContainers.get() < numTotalContainers && !appDone) {
|
||||
loopCounter++;
|
||||
|
||||
LaunchContainerRunnable runnableLaunchContainer = new LaunchContainerRunnable(allocatedContainer);
|
||||
Thread launchThread = new Thread(runnableLaunchContainer);
|
||||
// log current state
|
||||
LOG.info("Current application state: loop=" + loopCounter
|
||||
+ ", appDone=" + appDone + ", total=" + numTotalContainers
|
||||
+ ", requested=" + numRequestedContainers + ", completed="
|
||||
+ numCompletedContainers + ", failed=" + numFailedContainers
|
||||
+ ", currentAllocated=" + numAllocatedContainers);
|
||||
|
||||
// launch and start the container on a separate thread to keep the main thread unblocked
|
||||
// as all containers may not be allocated at one go.
|
||||
launchThreads.add(launchThread);
|
||||
launchThread.start();
|
||||
}
|
||||
// Sleep before each loop when asking RM for containers
|
||||
// to avoid flooding RM with spurious requests when it
|
||||
// need not have any available containers
|
||||
// Sleeping for 1000 ms.
|
||||
try {
|
||||
Thread.sleep(1000);
|
||||
} catch (InterruptedException e) {
|
||||
LOG.info("Sleep interrupted " + e.getMessage());
|
||||
}
|
||||
|
||||
// Check what the current available resources in the cluster are
|
||||
// TODO should we do anything if the available resources are not enough?
|
||||
Resource availableResources = amResp.getAvailableResources();
|
||||
LOG.info("Current available resources in the cluster " + availableResources);
|
||||
// No. of containers to request
|
||||
// For the first loop, askCount will be equal to total containers needed
|
||||
// From that point on, askCount will always be 0 as current
|
||||
// implementation does not change its ask on container failures.
|
||||
int askCount = numTotalContainers - numRequestedContainers.get();
|
||||
numRequestedContainers.addAndGet(askCount);
|
||||
|
||||
// Check the completed containers
|
||||
List<ContainerStatus> completedContainers = amResp.getCompletedContainersStatuses();
|
||||
LOG.info("Got response from RM for container ask, completedCnt=" + completedContainers.size());
|
||||
for (ContainerStatus containerStatus : completedContainers) {
|
||||
LOG.info("Got container status for containerID= " + containerStatus.getContainerId()
|
||||
+ ", state=" + containerStatus.getState()
|
||||
+ ", exitStatus=" + containerStatus.getExitStatus()
|
||||
+ ", diagnostics=" + containerStatus.getDiagnostics());
|
||||
if (askCount > 0) {
|
||||
ContainerRequest containerAsk = setupContainerAskForRM(askCount);
|
||||
resourceManager.addContainerRequest(containerAsk);
|
||||
}
|
||||
|
||||
// non complete containers should not be here
|
||||
assert(containerStatus.getState() == ContainerState.COMPLETE);
|
||||
// Send the request to RM
|
||||
LOG.info("Asking RM for containers" + ", askCount=" + askCount);
|
||||
AMResponse amResp = sendContainerAskToRM();
|
||||
|
||||
// increment counters for completed/failed containers
|
||||
int exitStatus = containerStatus.getExitStatus();
|
||||
if (0 != exitStatus) {
|
||||
// container failed
|
||||
if (-100 != exitStatus) {
|
||||
// shell script failed
|
||||
// counts as completed
|
||||
// Retrieve list of allocated containers from the response
|
||||
List<Container> allocatedContainers = amResp.getAllocatedContainers();
|
||||
LOG.info("Got response from RM for container ask, allocatedCnt="
|
||||
+ allocatedContainers.size());
|
||||
numAllocatedContainers.addAndGet(allocatedContainers.size());
|
||||
for (Container allocatedContainer : allocatedContainers) {
|
||||
LOG.info("Launching shell command on a new container."
|
||||
+ ", containerId=" + allocatedContainer.getId()
|
||||
+ ", containerNode=" + allocatedContainer.getNodeId().getHost()
|
||||
+ ":" + allocatedContainer.getNodeId().getPort()
|
||||
+ ", containerNodeURI=" + allocatedContainer.getNodeHttpAddress()
|
||||
+ ", containerState" + allocatedContainer.getState()
|
||||
+ ", containerResourceMemory"
|
||||
+ allocatedContainer.getResource().getMemory());
|
||||
// + ", containerToken"
|
||||
// +allocatedContainer.getContainerToken().getIdentifier().toString());
|
||||
|
||||
LaunchContainerRunnable runnableLaunchContainer = new LaunchContainerRunnable(
|
||||
allocatedContainer);
|
||||
Thread launchThread = new Thread(runnableLaunchContainer);
|
||||
|
||||
// launch and start the container on a separate thread to keep
|
||||
// the main thread unblocked
|
||||
// as all containers may not be allocated at one go.
|
||||
launchThreads.add(launchThread);
|
||||
launchThread.start();
|
||||
}
|
||||
|
||||
// Check what the current available resources in the cluster are
|
||||
// TODO should we do anything if the available resources are not enough?
|
||||
Resource availableResources = amResp.getAvailableResources();
|
||||
LOG.info("Current available resources in the cluster "
|
||||
+ availableResources);
|
||||
|
||||
// Check the completed containers
|
||||
List<ContainerStatus> completedContainers = amResp
|
||||
.getCompletedContainersStatuses();
|
||||
LOG.info("Got response from RM for container ask, completedCnt="
|
||||
+ completedContainers.size());
|
||||
for (ContainerStatus containerStatus : completedContainers) {
|
||||
LOG.info("Got container status for containerID="
|
||||
+ containerStatus.getContainerId() + ", state="
|
||||
+ containerStatus.getState() + ", exitStatus="
|
||||
+ containerStatus.getExitStatus() + ", diagnostics="
|
||||
+ containerStatus.getDiagnostics());
|
||||
|
||||
// non complete containers should not be here
|
||||
assert (containerStatus.getState() == ContainerState.COMPLETE);
|
||||
|
||||
// increment counters for completed/failed containers
|
||||
int exitStatus = containerStatus.getExitStatus();
|
||||
if (0 != exitStatus) {
|
||||
// container failed
|
||||
if (-100 != exitStatus) {
|
||||
// shell script failed
|
||||
// counts as completed
|
||||
numCompletedContainers.incrementAndGet();
|
||||
numFailedContainers.incrementAndGet();
|
||||
} else {
|
||||
// something else bad happened
|
||||
// app job did not complete for some reason
|
||||
// we should re-try as the container was lost for some reason
|
||||
numAllocatedContainers.decrementAndGet();
|
||||
numRequestedContainers.decrementAndGet();
|
||||
// we do not need to release the container as it would be done
|
||||
// by the RM/CM.
|
||||
}
|
||||
} else {
|
||||
// nothing to do
|
||||
// container completed successfully
|
||||
numCompletedContainers.incrementAndGet();
|
||||
numFailedContainers.incrementAndGet();
|
||||
}
|
||||
else {
|
||||
// something else bad happened
|
||||
// app job did not complete for some reason
|
||||
// we should re-try as the container was lost for some reason
|
||||
numAllocatedContainers.decrementAndGet();
|
||||
numRequestedContainers.decrementAndGet();
|
||||
// we do not need to release the container as it would be done
|
||||
// by the RM/CM.
|
||||
LOG.info("Container completed successfully." + ", containerId="
|
||||
+ containerStatus.getContainerId());
|
||||
}
|
||||
}
|
||||
else {
|
||||
// nothing to do
|
||||
// container completed successfully
|
||||
numCompletedContainers.incrementAndGet();
|
||||
LOG.info("Container completed successfully."
|
||||
+ ", containerId=" + containerStatus.getContainerId());
|
||||
if (numCompletedContainers.get() == numTotalContainers) {
|
||||
appDone = true;
|
||||
}
|
||||
|
||||
}
|
||||
if (numCompletedContainers.get() == numTotalContainers) {
|
||||
appDone = true;
|
||||
LOG.info("Current application state: loop=" + loopCounter
|
||||
+ ", appDone=" + appDone + ", total=" + numTotalContainers
|
||||
+ ", requested=" + numRequestedContainers + ", completed="
|
||||
+ numCompletedContainers + ", failed=" + numFailedContainers
|
||||
+ ", currentAllocated=" + numAllocatedContainers);
|
||||
|
||||
// TODO
|
||||
// Add a timeout handling layer
|
||||
// for misbehaving shell commands
|
||||
}
|
||||
|
||||
LOG.info("Current application state: loop=" + loopCounter
|
||||
+ ", appDone=" + appDone
|
||||
+ ", total=" + numTotalContainers
|
||||
+ ", requested=" + numRequestedContainers
|
||||
+ ", completed=" + numCompletedContainers
|
||||
+ ", failed=" + numFailedContainers
|
||||
+ ", currentAllocated=" + numAllocatedContainers);
|
||||
|
||||
// TODO
|
||||
// Add a timeout handling layer
|
||||
// for misbehaving shell commands
|
||||
}
|
||||
|
||||
// Join all launched threads
|
||||
// needed for when we time out
|
||||
// and we need to release containers
|
||||
for (Thread launchThread : launchThreads) {
|
||||
try {
|
||||
launchThread.join(10000);
|
||||
} catch (InterruptedException e) {
|
||||
LOG.info("Exception thrown in thread join: " + e.getMessage());
|
||||
e.printStackTrace();
|
||||
// Join all launched threads
|
||||
// needed for when we time out
|
||||
// and we need to release containers
|
||||
for (Thread launchThread : launchThreads) {
|
||||
try {
|
||||
launchThread.join(10000);
|
||||
} catch (InterruptedException e) {
|
||||
LOG.info("Exception thrown in thread join: " + e.getMessage());
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// When the application completes, it should send a finish application signal
|
||||
// to the RM
|
||||
LOG.info("Application completed. Signalling finish to RM");
|
||||
// When the application completes, it should send a finish application
|
||||
// signal to the RM
|
||||
LOG.info("Application completed. Signalling finish to RM");
|
||||
|
||||
FinishApplicationMasterRequest finishReq = Records.newRecord(FinishApplicationMasterRequest.class);
|
||||
finishReq.setAppAttemptId(appAttemptID);
|
||||
boolean isSuccess = true;
|
||||
if (numFailedContainers.get() == 0) {
|
||||
finishReq.setFinishApplicationStatus(FinalApplicationStatus.SUCCEEDED);
|
||||
FinalApplicationStatus appStatus;
|
||||
String appMessage = null;
|
||||
boolean isSuccess = true;
|
||||
if (numFailedContainers.get() == 0) {
|
||||
appStatus = FinalApplicationStatus.SUCCEEDED;
|
||||
} else {
|
||||
appStatus = FinalApplicationStatus.FAILED;
|
||||
appMessage = "Diagnostics." + ", total=" + numTotalContainers
|
||||
+ ", completed=" + numCompletedContainers.get() + ", allocated="
|
||||
+ numAllocatedContainers.get() + ", failed="
|
||||
+ numFailedContainers.get();
|
||||
isSuccess = false;
|
||||
}
|
||||
resourceManager.unregisterApplicationMaster(appStatus, appMessage, null);
|
||||
return isSuccess;
|
||||
} finally {
|
||||
resourceManager.stop();
|
||||
}
|
||||
else {
|
||||
finishReq.setFinishApplicationStatus(FinalApplicationStatus.FAILED);
|
||||
String diagnostics = "Diagnostics."
|
||||
+ ", total=" + numTotalContainers
|
||||
+ ", completed=" + numCompletedContainers.get()
|
||||
+ ", allocated=" + numAllocatedContainers.get()
|
||||
+ ", failed=" + numFailedContainers.get();
|
||||
finishReq.setDiagnostics(diagnostics);
|
||||
isSuccess = false;
|
||||
}
|
||||
resourceManager.finishApplicationMaster(finishReq);
|
||||
return isSuccess;
|
||||
}
|
||||
|
||||
/**
|
||||
* Thread to connect to the {@link ContainerManager} and
|
||||
* launch the container that will execute the shell command.
|
||||
* Thread to connect to the {@link ContainerManager} and launch the container
|
||||
* that will execute the shell command.
|
||||
*/
|
||||
private class LaunchContainerRunnable implements Runnable {
|
||||
|
||||
|
@ -612,15 +660,16 @@ public class ApplicationMaster {
|
|||
* Helper function to connect to CM
|
||||
*/
|
||||
private void connectToCM() {
|
||||
LOG.debug("Connecting to ContainerManager for containerid=" + container.getId());
|
||||
LOG.debug("Connecting to ContainerManager for containerid="
|
||||
+ container.getId());
|
||||
String cmIpPortStr = container.getNodeId().getHost() + ":"
|
||||
+ container.getNodeId().getPort();
|
||||
InetSocketAddress cmAddress = NetUtils.createSocketAddr(cmIpPortStr);
|
||||
LOG.info("Connecting to ContainerManager at " + cmIpPortStr);
|
||||
this.cm = ((ContainerManager) rpc.getProxy(ContainerManager.class, cmAddress, conf));
|
||||
this.cm = ((ContainerManager) rpc.getProxy(ContainerManager.class,
|
||||
cmAddress, conf));
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
/**
|
||||
* Connects to CM, sets up container launch context
|
||||
|
@ -631,8 +680,10 @@ public class ApplicationMaster {
|
|||
// Connect to ContainerManager
|
||||
connectToCM();
|
||||
|
||||
LOG.info("Setting up container launch container for containerid=" + container.getId());
|
||||
ContainerLaunchContext ctx = Records.newRecord(ContainerLaunchContext.class);
|
||||
LOG.info("Setting up container launch container for containerid="
|
||||
+ container.getId());
|
||||
ContainerLaunchContext ctx = Records
|
||||
.newRecord(ContainerLaunchContext.class);
|
||||
|
||||
ctx.setContainerId(container.getId());
|
||||
ctx.setResource(container.getResource());
|
||||
|
@ -648,18 +699,20 @@ public class ApplicationMaster {
|
|||
// Set the local resources
|
||||
Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
|
||||
|
||||
// The container for the eventual shell commands needs its own local resources too.
|
||||
// In this scenario, if a shell script is specified, we need to have it copied
|
||||
// and made available to the container.
|
||||
// The container for the eventual shell commands needs its own local
|
||||
// resources too.
|
||||
// In this scenario, if a shell script is specified, we need to have it
|
||||
// copied and made available to the container.
|
||||
if (!shellScriptPath.isEmpty()) {
|
||||
LocalResource shellRsrc = Records.newRecord(LocalResource.class);
|
||||
shellRsrc.setType(LocalResourceType.FILE);
|
||||
shellRsrc.setVisibility(LocalResourceVisibility.APPLICATION);
|
||||
try {
|
||||
shellRsrc.setResource(ConverterUtils.getYarnUrlFromURI(new URI(shellScriptPath)));
|
||||
shellRsrc.setResource(ConverterUtils.getYarnUrlFromURI(new URI(
|
||||
shellScriptPath)));
|
||||
} catch (URISyntaxException e) {
|
||||
LOG.error("Error when trying to use shell script path specified in env"
|
||||
+ ", path=" + shellScriptPath);
|
||||
LOG.error("Error when trying to use shell script path specified"
|
||||
+ " in env, path=" + shellScriptPath);
|
||||
e.printStackTrace();
|
||||
|
||||
// A failure scenario on bad input such as invalid shell script path
|
||||
|
@ -689,11 +742,6 @@ public class ApplicationMaster {
|
|||
// Set args for the shell command if any
|
||||
vargs.add(shellArgs);
|
||||
// Add log redirect params
|
||||
// TODO
|
||||
// We should redirect the output to hdfs instead of local logs
|
||||
// so as to be able to look at the final output after the containers
|
||||
// have been released.
|
||||
// Could use a path suffixed with /AppId/AppAttempId/ContainerId/std[out|err]
|
||||
vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout");
|
||||
vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr");
|
||||
|
||||
|
@ -707,131 +755,78 @@ public class ApplicationMaster {
|
|||
commands.add(command.toString());
|
||||
ctx.setCommands(commands);
|
||||
|
||||
StartContainerRequest startReq = Records.newRecord(StartContainerRequest.class);
|
||||
StartContainerRequest startReq = Records
|
||||
.newRecord(StartContainerRequest.class);
|
||||
startReq.setContainerLaunchContext(ctx);
|
||||
try {
|
||||
cm.startContainer(startReq);
|
||||
} catch (YarnRemoteException e) {
|
||||
LOG.info("Start container failed for :"
|
||||
+ ", containerId=" + container.getId());
|
||||
LOG.info("Start container failed for :" + ", containerId="
|
||||
+ container.getId());
|
||||
e.printStackTrace();
|
||||
// TODO do we need to release this container?
|
||||
}
|
||||
|
||||
// Get container status?
|
||||
// Left commented out as the shell scripts are short lived
|
||||
// and we are relying on the status for completed containers from RM to detect status
|
||||
// and we are relying on the status for completed containers
|
||||
// from RM to detect status
|
||||
|
||||
// GetContainerStatusRequest statusReq = Records.newRecord(GetContainerStatusRequest.class);
|
||||
// statusReq.setContainerId(container.getId());
|
||||
// GetContainerStatusResponse statusResp;
|
||||
//try {
|
||||
//statusResp = cm.getContainerStatus(statusReq);
|
||||
// LOG.info("Container Status"
|
||||
// + ", id=" + container.getId()
|
||||
// + ", status=" +statusResp.getStatus());
|
||||
//} catch (YarnRemoteException e) {
|
||||
//e.printStackTrace();
|
||||
//}
|
||||
// GetContainerStatusRequest statusReq =
|
||||
// Records.newRecord(GetContainerStatusRequest.class);
|
||||
// statusReq.setContainerId(container.getId());
|
||||
// GetContainerStatusResponse statusResp;
|
||||
// try {
|
||||
// statusResp = cm.getContainerStatus(statusReq);
|
||||
// LOG.info("Container Status"
|
||||
// + ", id=" + container.getId()
|
||||
// + ", status=" +statusResp.getStatus());
|
||||
// } catch (YarnRemoteException e) {
|
||||
// e.printStackTrace();
|
||||
// }
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Connect to the Resource Manager
|
||||
* @return Handle to communicate with the RM
|
||||
*/
|
||||
private AMRMProtocol connectToRM() {
|
||||
YarnConfiguration yarnConf = new YarnConfiguration(conf);
|
||||
InetSocketAddress rmAddress = yarnConf.getSocketAddr(
|
||||
YarnConfiguration.RM_SCHEDULER_ADDRESS,
|
||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
|
||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);
|
||||
LOG.info("Connecting to ResourceManager at " + rmAddress);
|
||||
return ((AMRMProtocol) rpc.getProxy(AMRMProtocol.class, rmAddress, conf));
|
||||
}
|
||||
|
||||
/**
|
||||
* Register the Application Master to the Resource Manager
|
||||
* @return the registration response from the RM
|
||||
* @throws YarnRemoteException
|
||||
*/
|
||||
private RegisterApplicationMasterResponse registerToRM() throws YarnRemoteException {
|
||||
RegisterApplicationMasterRequest appMasterRequest = Records.newRecord(RegisterApplicationMasterRequest.class);
|
||||
|
||||
// set the required info into the registration request:
|
||||
// application attempt id,
|
||||
// host on which the app master is running
|
||||
// rpc port on which the app master accepts requests from the client
|
||||
// tracking url for the app master
|
||||
appMasterRequest.setApplicationAttemptId(appAttemptID);
|
||||
appMasterRequest.setHost(appMasterHostname);
|
||||
appMasterRequest.setRpcPort(appMasterRpcPort);
|
||||
appMasterRequest.setTrackingUrl(appMasterTrackingUrl);
|
||||
|
||||
return resourceManager.registerApplicationMaster(appMasterRequest);
|
||||
}
|
||||
|
||||
/**
|
||||
* Setup the request that will be sent to the RM for the container ask.
|
||||
*
|
||||
* @param numContainers Containers to ask for from RM
|
||||
* @return the setup ResourceRequest to be sent to RM
|
||||
*/
|
||||
private ResourceRequest setupContainerAskForRM(int numContainers) {
|
||||
ResourceRequest request = Records.newRecord(ResourceRequest.class);
|
||||
|
||||
private ContainerRequest setupContainerAskForRM(int numContainers) {
|
||||
// setup requirements for hosts
|
||||
// whether a particular rack/host is needed
|
||||
// Refer to apis under org.apache.hadoop.net for more
|
||||
// details on how to get figure out rack/host mapping.
|
||||
// using * as any host will do for the distributed shell app
|
||||
request.setHostName("*");
|
||||
|
||||
// set no. of containers needed
|
||||
request.setNumContainers(numContainers);
|
||||
|
||||
// set the priority for the request
|
||||
Priority pri = Records.newRecord(Priority.class);
|
||||
// TODO - what is the range for priority? how to decide?
|
||||
pri.setPriority(requestPriority);
|
||||
request.setPriority(pri);
|
||||
|
||||
// Set up resource type requirements
|
||||
// For now, only memory is supported so we set memory requirements
|
||||
Resource capability = Records.newRecord(Resource.class);
|
||||
capability.setMemory(containerMemory);
|
||||
request.setCapability(capability);
|
||||
|
||||
ContainerRequest request = new ContainerRequest(capability, null, null,
|
||||
pri, numContainers);
|
||||
LOG.info("Requested container ask: " + request.toString());
|
||||
return request;
|
||||
}
|
||||
|
||||
/**
|
||||
* Ask RM to allocate given no. of containers to this Application Master
|
||||
*
|
||||
* @param requestedContainers Containers to ask for from RM
|
||||
* @return Response from RM to AM with allocated containers
|
||||
* @throws YarnRemoteException
|
||||
*/
|
||||
private AMResponse sendContainerAskToRM(List<ResourceRequest> requestedContainers)
|
||||
throws YarnRemoteException {
|
||||
AllocateRequest req = Records.newRecord(AllocateRequest.class);
|
||||
req.setResponseId(rmRequestID.incrementAndGet());
|
||||
req.setApplicationAttemptId(appAttemptID);
|
||||
req.addAllAsks(requestedContainers);
|
||||
req.addAllReleases(releasedContainers);
|
||||
req.setProgress((float)numCompletedContainers.get()/numTotalContainers);
|
||||
private AMResponse sendContainerAskToRM() throws YarnRemoteException {
|
||||
float progressIndicator = (float) numCompletedContainers.get()
|
||||
/ numTotalContainers;
|
||||
|
||||
LOG.info("Sending request to RM for containers"
|
||||
+ ", requestedSet=" + requestedContainers.size()
|
||||
+ ", releasedSet=" + releasedContainers.size()
|
||||
+ ", progress=" + req.getProgress());
|
||||
LOG.info("Sending request to RM for containers" + ", progress="
|
||||
+ progressIndicator);
|
||||
|
||||
for (ResourceRequest rsrcReq : requestedContainers) {
|
||||
LOG.info("Requested container ask: " + rsrcReq.toString());
|
||||
}
|
||||
for (ContainerId id : releasedContainers) {
|
||||
LOG.info("Released container, id=" + id.getId());
|
||||
}
|
||||
|
||||
AllocateResponse resp = resourceManager.allocate(req);
|
||||
AllocateResponse resp = resourceManager.allocate(progressIndicator);
|
||||
return resp.getAMResponse();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,10 +18,7 @@
|
|||
|
||||
package org.apache.hadoop.yarn.applications.distributedshell;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.InputStreamReader;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
|
@ -212,7 +209,7 @@ public class Client extends YarnClientImpl {
|
|||
/**
|
||||
*/
|
||||
public Client() throws Exception {
|
||||
this(new Configuration());
|
||||
this(new YarnConfiguration());
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -120,6 +120,7 @@ public class TestDistributedShell {
|
|||
boolean exceptionThrown = false;
|
||||
try {
|
||||
boolean initSuccess = client.init(args);
|
||||
Assert.assertTrue(initSuccess);
|
||||
}
|
||||
catch (IllegalArgumentException e) {
|
||||
exceptionThrown = true;
|
||||
|
|
Loading…
Reference in New Issue