YARN-6685. Add job count in to SLS JSON input format. (Yufei Gu via Haibo Chen)
This commit is contained in:
parent
c21c260392
commit
0ba8cda135
|
@ -395,18 +395,28 @@ public class SLSRunner extends Configured implements Tool {
|
||||||
String queue = jsonJob.get("job.queue.name").toString();
|
String queue = jsonJob.get("job.queue.name").toString();
|
||||||
increaseQueueAppNum(queue);
|
increaseQueueAppNum(queue);
|
||||||
|
|
||||||
String oldAppId = (String)jsonJob.get("job.id");
|
|
||||||
if (oldAppId == null) {
|
|
||||||
oldAppId = Integer.toString(AM_ID);
|
|
||||||
}
|
|
||||||
|
|
||||||
String amType = (String)jsonJob.get("am.type");
|
String amType = (String)jsonJob.get("am.type");
|
||||||
if (amType == null) {
|
if (amType == null) {
|
||||||
amType = SLSUtils.DEFAULT_JOB_TYPE;
|
amType = SLSUtils.DEFAULT_JOB_TYPE;
|
||||||
}
|
}
|
||||||
|
|
||||||
runNewAM(amType, user, queue, oldAppId, jobStartTime, jobFinishTime,
|
int jobCount = 1;
|
||||||
getTaskContainers(jsonJob), null, getAMContainerResource(jsonJob));
|
if (jsonJob.containsKey("job.count")) {
|
||||||
|
jobCount = Integer.parseInt(jsonJob.get("job.count").toString());
|
||||||
|
}
|
||||||
|
jobCount = Math.max(jobCount, 1);
|
||||||
|
|
||||||
|
String oldAppId = (String)jsonJob.get("job.id");
|
||||||
|
// Job id is generated automatically if this job configuration allows
|
||||||
|
// multiple job instances
|
||||||
|
if(jobCount > 1) {
|
||||||
|
oldAppId = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int i = 0; i < jobCount; i++) {
|
||||||
|
runNewAM(amType, user, queue, oldAppId, jobStartTime, jobFinishTime,
|
||||||
|
getTaskContainers(jsonJob), null, getAMContainerResource(jsonJob));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private List<ContainerSimulator> getTaskContainers(Map jsonJob)
|
private List<ContainerSimulator> getTaskContainers(Map jsonJob)
|
||||||
|
@ -732,6 +742,10 @@ public class SLSRunner extends Configured implements Tool {
|
||||||
SLSConfiguration.AM_HEARTBEAT_INTERVAL_MS,
|
SLSConfiguration.AM_HEARTBEAT_INTERVAL_MS,
|
||||||
SLSConfiguration.AM_HEARTBEAT_INTERVAL_MS_DEFAULT);
|
SLSConfiguration.AM_HEARTBEAT_INTERVAL_MS_DEFAULT);
|
||||||
boolean isTracked = trackedApps.contains(oldJobId);
|
boolean isTracked = trackedApps.contains(oldJobId);
|
||||||
|
|
||||||
|
if (oldJobId == null) {
|
||||||
|
oldJobId = Integer.toString(AM_ID);
|
||||||
|
}
|
||||||
AM_ID++;
|
AM_ID++;
|
||||||
|
|
||||||
amSim.init(heartbeatInterval, containerList, rm, this, jobStartTimeMS,
|
amSim.init(heartbeatInterval, containerList, rm, this, jobStartTimeMS,
|
||||||
|
|
|
@ -336,8 +336,9 @@ Here we provide an example format of the sls json file, which contains 2 jobs. T
|
||||||
"job.start.ms" : 0, // job start time
|
"job.start.ms" : 0, // job start time
|
||||||
"job.end.ms" : 95375, // job finish time, optional, the default value is 0
|
"job.end.ms" : 95375, // job finish time, optional, the default value is 0
|
||||||
"job.queue.name" : "sls_queue_1", // the queue job will be submitted to
|
"job.queue.name" : "sls_queue_1", // the queue job will be submitted to
|
||||||
"job.id" : "job_1", // the job id used to track the job, optional, the default value is an zero-based integer increasing with number of jobs
|
"job.id" : "job_1", // the job id used to track the job, optional. The default value, an zero-based integer increasing with number of jobs, is used if this is not specified or job.count > 1
|
||||||
"job.user" : "default", // user, optional, the default value is "default"
|
"job.user" : "default", // user, optional, the default value is "default"
|
||||||
|
"job.count" : 1, // number of jobs, optional, the default value is 1
|
||||||
"job.tasks" : [ {
|
"job.tasks" : [ {
|
||||||
"count": 1, // number of tasks, optional, the default value is 1
|
"count": 1, // number of tasks, optional, the default value is 1
|
||||||
"container.host" : "/default-rack/node1", // host the container asks for
|
"container.host" : "/default-rack/node1", // host the container asks for
|
||||||
|
|
Loading…
Reference in New Issue