YARN-10547. Decouple job parsing logic from SLSRunner. Contributed by Szilard Nemeth.

This commit is contained in:
9uapaw 2022-03-24 06:16:13 +01:00
parent 9edfe30a60
commit 077c6c62d6
16 changed files with 1272 additions and 425 deletions

View File

@ -0,0 +1,105 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.sls;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator;
import java.util.List;
public abstract class AMDefinition {
protected int jobCount;
protected String amType;
protected String user;
protected String queue;
protected long jobStartTime;
protected long jobFinishTime;
protected List<ContainerSimulator> taskContainers;
protected Resource amResource;
protected String labelExpression;
protected String oldAppId;
public AMDefinition(AmDefinitionBuilder builder) {
this.jobStartTime = builder.jobStartTime;
this.jobFinishTime = builder.jobFinishTime;
this.amType = builder.amType;
this.taskContainers = builder.taskContainers;
this.labelExpression = builder.labelExpression;
this.user = builder.user;
this.amResource = builder.amResource;
this.queue = builder.queue;
this.jobCount = builder.jobCount;
this.oldAppId = builder.jobId;
}
public String getAmType() {
return amType;
}
public String getUser() {
return user;
}
public String getOldAppId() {
return oldAppId;
}
public long getJobStartTime() {
return jobStartTime;
}
public long getJobFinishTime() {
return jobFinishTime;
}
public List<ContainerSimulator> getTaskContainers() {
return taskContainers;
}
public Resource getAmResource() {
return amResource;
}
public String getLabelExpression() {
return labelExpression;
}
public String getQueue() {
return queue;
}
public int getJobCount() {
return jobCount;
}
public abstract static class AmDefinitionBuilder {
private static final String DEFAULT_USER = "default";
protected int jobCount = 1;
protected String amType = AMDefinitionFactory.DEFAULT_JOB_TYPE;
protected String user = DEFAULT_USER;
protected String queue;
protected String jobId;
protected long jobStartTime;
protected long jobFinishTime;
protected List<ContainerSimulator> taskContainers;
protected Resource amResource;
protected String labelExpression = null;
}
}

View File

@ -0,0 +1,133 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.sls;
import java.util.Map;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.tools.rumen.LoggedJob;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.sls.conf.SLSConfiguration;
import org.apache.hadoop.yarn.sls.synthetic.SynthJob;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public final class AMDefinitionFactory {
private static final Logger LOG = LoggerFactory.getLogger(
AMDefinitionFactory.class);
public final static String DEFAULT_JOB_TYPE = "mapreduce";
private AMDefinitionFactory() {}
public static AMDefinitionSLS createFromSlsTrace(Map<?, ?> jsonJob,
SLSRunner slsRunner) throws YarnException {
AMDefinitionSLS amDefinition = AMDefinitionSLS.Builder.create(jsonJob)
.withAmType(SLSConfiguration.AM_TYPE)
.withAmResource(getAMContainerResourceSLS(jsonJob, slsRunner))
.withTaskContainers(
AMDefinitionSLS.getTaskContainers(jsonJob, slsRunner))
.withJobStartTime(SLSConfiguration.JOB_START_MS)
.withJobFinishTime(SLSConfiguration.JOB_END_MS)
.withLabelExpression(SLSConfiguration.JOB_LABEL_EXPR)
.withUser(SLSConfiguration.JOB_USER)
.withQueue(SLSConfiguration.JOB_QUEUE_NAME)
.withJobId(SLSConfiguration.JOB_ID)
.withJobCount(SLSConfiguration.JOB_COUNT)
.build();
slsRunner.increaseQueueAppNum(amDefinition.getQueue());
return amDefinition;
}
public static AMDefinitionRumen createFromRumenTrace(LoggedJob job,
long baselineTimeMs, SLSRunner slsRunner) throws YarnException {
AMDefinitionRumen amDefinition = AMDefinitionRumen.Builder.create()
.withAmType(DEFAULT_JOB_TYPE)
.withAmResource(getAMContainerResourceSynthAndRumen(slsRunner))
.withTaskContainers(
AMDefinitionRumen.getTaskContainers(job, slsRunner))
.withJobStartTime(job.getSubmitTime())
.withJobFinishTime(job.getFinishTime())
.withBaseLineTimeMs(baselineTimeMs)
.withUser(job.getUser())
.withQueue(job.getQueue().getValue())
.withJobId(job.getJobID().toString())
.build();
slsRunner.increaseQueueAppNum(amDefinition.getQueue());
return amDefinition;
}
public static AMDefinitionSynth createFromSynth(SynthJob job,
SLSRunner slsRunner) throws YarnException {
AMDefinitionSynth amDefinition =
AMDefinitionSynth.Builder.create()
.withAmType(job.getType())
.withAmResource(getAMContainerResourceSynthAndRumen(slsRunner))
.withTaskContainers(
AMDefinitionSynth.getTaskContainers(job, slsRunner))
.withUser(job.getUser())
.withQueue(job.getQueueName())
.withJobId(job.getJobID().toString())
.withJobStartTime(job.getSubmissionTime())
.withJobFinishTime(job.getSubmissionTime() + job.getDuration())
.withBaseLineTimeMs(0)
.build();
slsRunner.increaseQueueAppNum(amDefinition.getQueue());
return amDefinition;
}
private static Resource getAMContainerResourceSLS(Map<?, ?> jsonJob,
Configured configured) {
Resource amContainerResource =
SLSConfiguration.getAMContainerResource(configured.getConf());
if (jsonJob == null) {
return amContainerResource;
}
ResourceInformation[] infors = ResourceUtils.getResourceTypesArray();
for (ResourceInformation info : infors) {
String key = SLSConfiguration.JOB_AM_PREFIX + info.getName();
if (jsonJob.containsKey(key)) {
long value = Long.parseLong(jsonJob.get(key).toString());
amContainerResource.setResourceValue(info.getName(), value);
}
}
return amContainerResource;
}
private static Resource getAMContainerResourceSynthAndRumen(
Configured configured) {
return SLSConfiguration.getAMContainerResource(configured.getConf());
}
static void adjustTimeValuesToBaselineTime(AMDefinition amDef,
AMDefinition.AmDefinitionBuilder builder, long baselineTimeMs) {
builder.jobStartTime -= baselineTimeMs;
builder.jobFinishTime -= baselineTimeMs;
if (builder.jobStartTime < 0) {
LOG.warn("Warning: reset job {} start time to 0.", amDef.getOldAppId());
builder.jobFinishTime = builder.jobFinishTime - builder.jobStartTime;
builder.jobStartTime = 0;
}
amDef.jobStartTime = builder.jobStartTime;
}
}

View File

@ -0,0 +1,167 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.sls;
import org.apache.hadoop.tools.rumen.LoggedJob;
import org.apache.hadoop.tools.rumen.LoggedTask;
import org.apache.hadoop.tools.rumen.LoggedTaskAttempt;
import org.apache.hadoop.tools.rumen.datatypes.UserName;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator;
import java.util.ArrayList;
import java.util.List;
import static org.apache.hadoop.yarn.sls.AMDefinitionFactory.adjustTimeValuesToBaselineTime;
public class AMDefinitionRumen extends AMDefinition {
public final static int DEFAULT_MAPPER_PRIORITY = 20;
private final static int DEFAULT_REDUCER_PRIORITY = 10;
public AMDefinitionRumen(AmDefinitionBuilder builder) {
super(builder);
}
public static List<ContainerSimulator> getTaskContainers(LoggedJob job,
SLSRunner slsRunner) throws YarnException {
List<ContainerSimulator> containerList = new ArrayList<>();
TaskContainerDefinition.Builder builder =
TaskContainerDefinition.Builder.create()
.withCount(1)
.withResource(slsRunner.getDefaultContainerResource())
.withExecutionType(ExecutionType.GUARANTEED)
.withAllocationId(-1)
.withRequestDelay(0);
// mapper
for (LoggedTask mapTask : job.getMapTasks()) {
if (mapTask.getAttempts().size() == 0) {
throw new YarnException("Invalid map task, no attempt for a mapper!");
}
LoggedTaskAttempt taskAttempt =
mapTask.getAttempts().get(mapTask.getAttempts().size() - 1);
TaskContainerDefinition containerDef = builder
.withHostname(taskAttempt.getHostName().getValue())
.withDuration(taskAttempt.getFinishTime() -
taskAttempt.getStartTime())
.withPriority(DEFAULT_MAPPER_PRIORITY)
.withType("map")
.build();
containerList.add(
ContainerSimulator.createFromTaskContainerDefinition(containerDef));
}
// reducer
for (LoggedTask reduceTask : job.getReduceTasks()) {
if (reduceTask.getAttempts().size() == 0) {
throw new YarnException(
"Invalid reduce task, no attempt for a reducer!");
}
LoggedTaskAttempt taskAttempt =
reduceTask.getAttempts().get(reduceTask.getAttempts().size() - 1);
TaskContainerDefinition containerDef = builder
.withHostname(taskAttempt.getHostName().getValue())
.withDuration(taskAttempt.getFinishTime() -
taskAttempt.getStartTime())
.withPriority(DEFAULT_REDUCER_PRIORITY)
.withType("reduce")
.build();
containerList.add(
ContainerSimulator.createFromTaskContainerDefinition(containerDef));
}
return containerList;
}
public static final class Builder extends AmDefinitionBuilder {
private long baselineTimeMs;
private Builder() {
}
public static Builder create() {
return new Builder();
}
public Builder withAmType(String amType) {
this.amType = amType;
return this;
}
public Builder withUser(UserName user) {
if (user != null) {
this.user = user.getValue();
}
return this;
}
public Builder withQueue(String queue) {
this.queue = queue;
return this;
}
public Builder withJobId(String oldJobId) {
this.jobId = oldJobId;
return this;
}
public Builder withJobStartTime(long time) {
this.jobStartTime = time;
return this;
}
public Builder withJobFinishTime(long time) {
this.jobFinishTime = time;
return this;
}
public Builder withBaseLineTimeMs(long baselineTimeMs) {
this.baselineTimeMs = baselineTimeMs;
return this;
}
public Builder withLabelExpression(String expr) {
this.labelExpression = expr;
return this;
}
public AMDefinitionRumen.Builder withTaskContainers(
List<ContainerSimulator> taskContainers) {
this.taskContainers = taskContainers;
return this;
}
public AMDefinitionRumen.Builder withAmResource(Resource amResource) {
this.amResource = amResource;
return this;
}
public AMDefinitionRumen build() {
AMDefinitionRumen amDef = new AMDefinitionRumen(this);
if (baselineTimeMs == 0) {
baselineTimeMs = jobStartTime;
}
adjustTimeValuesToBaselineTime(amDef, this, baselineTimeMs);
return amDef;
}
}
}

View File

@ -0,0 +1,186 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.sls;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.sls.conf.SLSConfiguration;
import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
public class AMDefinitionSLS extends AMDefinition {
public AMDefinitionSLS(AmDefinitionBuilder builder) {
super(builder);
}
public String getQueue() {
return queue;
}
public static List<ContainerSimulator> getTaskContainers(Map<?, ?> jsonJob,
SLSRunner slsRunner) throws YarnException {
List<Map<?, ?>> tasks = (List) jsonJob.get(SLSConfiguration.JOB_TASKS);
if (tasks == null || tasks.size() == 0) {
throw new YarnException("No task for the job!");
}
List<ContainerSimulator> containers = new ArrayList<>();
for (Map<?, ?> jsonTask : tasks) {
TaskContainerDefinition containerDef =
TaskContainerDefinition.Builder.create()
.withCount(jsonTask, SLSConfiguration.COUNT)
.withHostname((String) jsonTask.get(SLSConfiguration.TASK_HOST))
.withDuration(jsonTask, SLSConfiguration.TASK_DURATION_MS)
.withDurationLegacy(jsonTask, SLSConfiguration.DURATION_MS)
.withTaskStart(jsonTask, SLSConfiguration.TASK_START_MS)
.withTaskFinish(jsonTask, SLSConfiguration.TASK_END_MS)
.withResource(getResourceForContainer(jsonTask, slsRunner))
.withPriority(jsonTask, SLSConfiguration.TASK_PRIORITY)
.withType(jsonTask, SLSConfiguration.TASK_TYPE)
.withExecutionType(jsonTask, SLSConfiguration.TASK_EXECUTION_TYPE)
.withAllocationId(jsonTask, SLSConfiguration.TASK_ALLOCATION_ID)
.withRequestDelay(jsonTask, SLSConfiguration.TASK_REQUEST_DELAY)
.build();
for (int i = 0; i < containerDef.getCount(); i++) {
containers.add(ContainerSimulator.
createFromTaskContainerDefinition(containerDef));
}
}
return containers;
}
private static Resource getResourceForContainer(Map<?, ?> jsonTask,
SLSRunner slsRunner) {
Resource res = slsRunner.getDefaultContainerResource();
ResourceInformation[] infors = ResourceUtils.getResourceTypesArray();
for (ResourceInformation info : infors) {
if (jsonTask.containsKey(SLSConfiguration.TASK_PREFIX + info.getName())) {
long value = Long.parseLong(
jsonTask.get(SLSConfiguration.TASK_PREFIX + info.getName())
.toString());
res.setResourceValue(info.getName(), value);
}
}
return res;
}
public static final class Builder extends AmDefinitionBuilder {
private final Map<?, ?> jsonJob;
private Builder(Map<?, ?> jsonJob) {
this.jsonJob = jsonJob;
}
public static Builder create(Map<?, ?> jsonJob) {
return new Builder(jsonJob);
}
public Builder withAmType(String key) {
if (jsonJob.containsKey(key)) {
String amType = (String) jsonJob.get(key);
if (amType != null) {
this.amType = amType;
}
}
return this;
}
public Builder withUser(String key) {
if (jsonJob.containsKey(key)) {
String user = (String) jsonJob.get(key);
if (user != null) {
this.user = user;
}
}
return this;
}
public Builder withQueue(String key) {
if (jsonJob.containsKey(key)) {
this.queue = jsonJob.get(key).toString();
}
return this;
}
public Builder withJobId(String key) {
if (jsonJob.containsKey(key)) {
this.jobId = (String) jsonJob.get(key);
}
return this;
}
public Builder withJobCount(String key) {
if (jsonJob.containsKey(key)) {
jobCount = Integer.parseInt(jsonJob.get(key).toString());
jobCount = Math.max(jobCount, 1);
}
return this;
}
public Builder withJobStartTime(String key) {
if (jsonJob.containsKey(key)) {
this.jobStartTime = Long.parseLong(jsonJob.get(key).toString());
}
return this;
}
public Builder withJobFinishTime(String key) {
if (jsonJob.containsKey(key)) {
this.jobFinishTime = Long.parseLong(jsonJob.get(key).toString());
}
return this;
}
public Builder withLabelExpression(String key) {
if (jsonJob.containsKey(key)) {
this.labelExpression = jsonJob.get(key).toString();
}
return this;
}
public AMDefinitionSLS.Builder withTaskContainers(
List<ContainerSimulator> taskContainers) {
this.taskContainers = taskContainers;
return this;
}
public AMDefinitionSLS.Builder withAmResource(Resource amResource) {
this.amResource = amResource;
return this;
}
public AMDefinitionSLS build() {
AMDefinitionSLS amDef = new AMDefinitionSLS(this);
// Job id is generated automatically if this job configuration allows
// multiple job instances
if (jobCount > 1) {
amDef.oldAppId = null;
} else {
amDef.oldAppId = jobId;
}
amDef.jobCount = jobCount;
return amDef;
}
}
}

View File

@ -0,0 +1,146 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.sls;
import static org.apache.hadoop.yarn.sls.AMDefinitionFactory.adjustTimeValuesToBaselineTime;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator;
import org.apache.hadoop.yarn.sls.synthetic.SynthJob;
public class AMDefinitionSynth extends AMDefinition {
public AMDefinitionSynth(AmDefinitionBuilder builder) {
super(builder);
}
public static List<ContainerSimulator> getTaskContainers(
SynthJob job, SLSRunner slsRunner) throws YarnException {
List<ContainerSimulator> containerList = new ArrayList<>();
ArrayList<NodeId> keyAsArray = new ArrayList<>(
slsRunner.getNmMap().keySet());
Random rand = new Random(slsRunner.getStjp().getSeed());
for (SynthJob.SynthTask task : job.getTasks()) {
RMNode node = getRandomNode(slsRunner, keyAsArray, rand);
TaskContainerDefinition containerDef =
TaskContainerDefinition.Builder.create()
.withCount(1)
.withHostname("/" + node.getRackName() + "/" + node.getHostName())
.withDuration(task.getTime())
.withResource(Resource
.newInstance((int) task.getMemory(), (int) task.getVcores()))
.withPriority(task.getPriority())
.withType(task.getType())
.withExecutionType(task.getExecutionType())
.withAllocationId(-1)
.withRequestDelay(0)
.build();
containerList.add(
ContainerSimulator.createFromTaskContainerDefinition(containerDef));
}
return containerList;
}
private static RMNode getRandomNode(SLSRunner slsRunner,
ArrayList<NodeId> keyAsArray, Random rand) {
int randomIndex = rand.nextInt(keyAsArray.size());
return slsRunner.getNmMap().get(keyAsArray.get(randomIndex)).getNode();
}
public static final class Builder extends AmDefinitionBuilder {
private long baselineTimeMs;
private Builder() {
}
public static Builder create() {
return new Builder();
}
public Builder withAmType(String amType) {
this.amType = amType;
return this;
}
public Builder withUser(String user) {
if (user != null) {
this.user = user;
}
return this;
}
public Builder withQueue(String queue) {
this.queue = queue;
return this;
}
public Builder withJobId(String oldJobId) {
this.jobId = oldJobId;
return this;
}
public Builder withJobStartTime(long time) {
this.jobStartTime = time;
return this;
}
public Builder withJobFinishTime(long time) {
this.jobFinishTime = time;
return this;
}
public Builder withBaseLineTimeMs(long baselineTimeMs) {
this.baselineTimeMs = baselineTimeMs;
return this;
}
public AMDefinitionSynth.Builder withLabelExpression(String expr) {
this.labelExpression = expr;
return this;
}
public AMDefinitionSynth.Builder withTaskContainers(
List<ContainerSimulator> taskContainers) {
this.taskContainers = taskContainers;
return this;
}
public AMDefinitionSynth.Builder withAmResource(Resource amResource) {
this.amResource = amResource;
return this;
}
public AMDefinitionSynth build() {
AMDefinitionSynth amDef = new AMDefinitionSynth(this);
if (baselineTimeMs == 0) {
baselineTimeMs = jobStartTime;
}
adjustTimeValuesToBaselineTime(amDef, this, baselineTimeMs);
return amDef;
}
}
}

View File

@ -0,0 +1,87 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.sls;
import org.apache.hadoop.yarn.api.records.ReservationId;
import java.util.Map;
public class JobDefinition {
private AMDefinition amDefinition;
private ReservationId reservationId;
private long deadline;
private Map<String, String> params;
public AMDefinition getAmDefinition() {
return amDefinition;
}
public ReservationId getReservationId() {
return reservationId;
}
public long getDeadline() {
return deadline;
}
//Currently unused
public Map<String, String> getParams() {
return params;
}
public static final class Builder {
private AMDefinition amDefinition;
private ReservationId reservationId;
private long deadline;
private Map<String, String> params;
private Builder() {
}
public static Builder create() {
return new Builder();
}
public Builder withAmDefinition(AMDefinition amDefinition) {
this.amDefinition = amDefinition;
return this;
}
public Builder withReservationId(ReservationId reservationId) {
this.reservationId = reservationId;
return this;
}
public Builder withDeadline(long deadline) {
this.deadline = deadline;
return this;
}
public Builder withParams(Map<String, String> params) {
this.params = params;
return this;
}
public JobDefinition build() {
JobDefinition jobDef = new JobDefinition();
jobDef.params = this.params;
jobDef.amDefinition = this.amDefinition;
jobDef.reservationId = this.reservationId;
jobDef.deadline = this.deadline;
return jobDef;
}
}
}

View File

@ -23,12 +23,10 @@ import java.io.IOException;
import java.io.InputStreamReader;
import java.io.Reader;
import java.security.Security;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
@ -59,13 +57,10 @@ import org.apache.hadoop.net.DNSToSwitchMapping;
import org.apache.hadoop.net.TableMapping;
import org.apache.hadoop.tools.rumen.JobTraceReader;
import org.apache.hadoop.tools.rumen.LoggedJob;
import org.apache.hadoop.tools.rumen.LoggedTask;
import org.apache.hadoop.tools.rumen.LoggedTaskAttempt;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeLabel;
import org.apache.hadoop.yarn.api.records.NodeState;
@ -89,7 +84,6 @@ import org.apache.hadoop.yarn.sls.scheduler.SLSCapacityScheduler;
import org.apache.hadoop.yarn.sls.scheduler.SchedulerMetrics;
import org.apache.hadoop.yarn.sls.scheduler.TaskRunner;
import org.apache.hadoop.yarn.sls.scheduler.SLSFairScheduler;
import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator;
import org.apache.hadoop.yarn.sls.scheduler.SchedulerWrapper;
import org.apache.hadoop.yarn.sls.synthetic.SynthJob;
import org.apache.hadoop.yarn.sls.synthetic.SynthTraceJobProducer;
@ -138,13 +132,8 @@ public class SLSRunner extends Configured implements Tool {
// logger
public final static Logger LOG = LoggerFactory.getLogger(SLSRunner.class);
private final static int DEFAULT_MAPPER_PRIORITY = 20;
private final static int DEFAULT_REDUCER_PRIORITY = 10;
private static boolean exitAtTheFinish = false;
private static final String DEFAULT_USER = "default";
/**
* The type of trace in input.
*/
@ -472,7 +461,10 @@ public class SLSRunner extends Configured implements Tool {
while (jobIter.hasNext()) {
try {
createAMForJob(jobIter.next());
Map jsonJob = jobIter.next();
AMDefinitionSLS amDef = AMDefinitionFactory.createFromSlsTrace(
jsonJob, this);
startAMs(amDef);
} catch (Exception e) {
LOG.error("Failed to create an AM: {}", e.getMessage());
}
@ -480,150 +472,29 @@ public class SLSRunner extends Configured implements Tool {
}
}
private void createAMForJob(Map jsonJob) throws YarnException {
long jobStartTime = Long.parseLong(
jsonJob.get(SLSConfiguration.JOB_START_MS).toString());
long jobFinishTime = 0;
if (jsonJob.containsKey(SLSConfiguration.JOB_END_MS)) {
jobFinishTime = Long.parseLong(
jsonJob.get(SLSConfiguration.JOB_END_MS).toString());
}
String jobLabelExpr = null;
if (jsonJob.containsKey(SLSConfiguration.JOB_LABEL_EXPR)) {
jobLabelExpr = jsonJob.get(SLSConfiguration.JOB_LABEL_EXPR).toString();
}
String user = (String) jsonJob.get(SLSConfiguration.JOB_USER);
if (user == null) {
user = "default";
}
String queue = jsonJob.get(SLSConfiguration.JOB_QUEUE_NAME).toString();
increaseQueueAppNum(queue);
String amType = (String)jsonJob.get(SLSConfiguration.AM_TYPE);
if (amType == null) {
amType = SLSUtils.DEFAULT_JOB_TYPE;
}
int jobCount = 1;
if (jsonJob.containsKey(SLSConfiguration.JOB_COUNT)) {
jobCount = Integer.parseInt(
jsonJob.get(SLSConfiguration.JOB_COUNT).toString());
}
jobCount = Math.max(jobCount, 1);
String oldAppId = (String)jsonJob.get(SLSConfiguration.JOB_ID);
// Job id is generated automatically if this job configuration allows
// multiple job instances
if(jobCount > 1) {
oldAppId = null;
}
for (int i = 0; i < jobCount; i++) {
runNewAM(amType, user, queue, oldAppId, jobStartTime, jobFinishTime,
getTaskContainers(jsonJob), getAMContainerResource(jsonJob),
jobLabelExpr);
private void startAMs(AMDefinition amDef) {
for (int i = 0; i < amDef.getJobCount(); i++) {
JobDefinition jobDef = JobDefinition.Builder.create()
.withAmDefinition(amDef)
.withDeadline(-1)
.withReservationId(null)
.withParams(null)
.build();
runNewAM(jobDef);
}
}
private List<ContainerSimulator> getTaskContainers(Map jsonJob)
throws YarnException {
List<ContainerSimulator> containers = new ArrayList<>();
List tasks = (List) jsonJob.get(SLSConfiguration.JOB_TASKS);
if (tasks == null || tasks.size() == 0) {
throw new YarnException("No task for the job!");
private void startAMs(AMDefinition amDef, ReservationId reservationId,
Map<String, String> params, long deadline) {
for (int i = 0; i < amDef.getJobCount(); i++) {
JobDefinition jobDef = JobDefinition.Builder.create()
.withAmDefinition(amDef)
.withReservationId(reservationId)
.withParams(params)
.withDeadline(deadline)
.build();
runNewAM(jobDef);
}
for (Object o : tasks) {
Map jsonTask = (Map) o;
String hostname = (String) jsonTask.get(SLSConfiguration.TASK_HOST);
long duration = 0;
if (jsonTask.containsKey(SLSConfiguration.TASK_DURATION_MS)) {
duration = Integer.parseInt(
jsonTask.get(SLSConfiguration.TASK_DURATION_MS).toString());
} else if (jsonTask.containsKey(SLSConfiguration.DURATION_MS)) {
// Also support "duration.ms" for backward compatibility
duration = Integer.parseInt(
jsonTask.get(SLSConfiguration.DURATION_MS).toString());
} else if (jsonTask.containsKey(SLSConfiguration.TASK_START_MS) &&
jsonTask.containsKey(SLSConfiguration.TASK_END_MS)) {
long taskStart = Long.parseLong(
jsonTask.get(SLSConfiguration.TASK_START_MS).toString());
long taskFinish = Long.parseLong(
jsonTask.get(SLSConfiguration.TASK_END_MS).toString());
duration = taskFinish - taskStart;
}
if (duration <= 0) {
throw new YarnException("Duration of a task shouldn't be less or equal"
+ " to 0!");
}
Resource res = getResourceForContainer(jsonTask);
int priority = DEFAULT_MAPPER_PRIORITY;
if (jsonTask.containsKey(SLSConfiguration.TASK_PRIORITY)) {
priority = Integer.parseInt(
jsonTask.get(SLSConfiguration.TASK_PRIORITY).toString());
}
String type = "map";
if (jsonTask.containsKey(SLSConfiguration.TASK_TYPE)) {
type = jsonTask.get(SLSConfiguration.TASK_TYPE).toString();
}
int count = 1;
if (jsonTask.containsKey(SLSConfiguration.COUNT)) {
count = Integer.parseInt(
jsonTask.get(SLSConfiguration.COUNT).toString());
}
count = Math.max(count, 1);
ExecutionType executionType = ExecutionType.GUARANTEED;
if (jsonTask.containsKey(SLSConfiguration.TASK_EXECUTION_TYPE)) {
executionType = ExecutionType.valueOf(
jsonTask.get(SLSConfiguration.TASK_EXECUTION_TYPE).toString());
}
long allocationId = -1;
if (jsonTask.containsKey(SLSConfiguration.TASK_ALLOCATION_ID)) {
allocationId = Long.parseLong(
jsonTask.get(SLSConfiguration.TASK_ALLOCATION_ID).toString());
}
long requestDelay = 0;
if (jsonTask.containsKey(SLSConfiguration.TASK_REQUEST_DELAY)) {
requestDelay = Long.parseLong(
jsonTask.get(SLSConfiguration.TASK_REQUEST_DELAY).toString());
}
requestDelay = Math.max(requestDelay, 0);
for (int i = 0; i < count; i++) {
containers.add(
new ContainerSimulator(res, duration, hostname, priority, type,
executionType, allocationId, requestDelay));
}
}
return containers;
}
private Resource getResourceForContainer(Map jsonTask) {
Resource res = getDefaultContainerResource();
ResourceInformation[] infors = ResourceUtils.getResourceTypesArray();
for (ResourceInformation info : infors) {
if (jsonTask.containsKey(SLSConfiguration.TASK_PREFIX + info.getName())) {
long value = Long.parseLong(
jsonTask.get(SLSConfiguration.TASK_PREFIX + info.getName())
.toString());
res.setResourceValue(info.getName(), value);
}
}
return res;
}
/**
@ -642,76 +513,19 @@ public class SLSRunner extends Configured implements Tool {
while (job != null) {
try {
createAMForJob(job, baselineTimeMS);
AMDefinitionRumen amDef =
AMDefinitionFactory.createFromRumenTrace(job, baselineTimeMS,
this);
startAMs(amDef);
} catch (Exception e) {
LOG.error("Failed to create an AM", e);
}
job = reader.getNext();
}
}
}
private void createAMForJob(LoggedJob job, long baselineTimeMs)
throws YarnException {
String user = job.getUser() == null ? "default" :
job.getUser().getValue();
String jobQueue = job.getQueue().getValue();
String oldJobId = job.getJobID().toString();
long jobStartTimeMS = job.getSubmitTime();
long jobFinishTimeMS = job.getFinishTime();
if (baselineTimeMs == 0) {
baselineTimeMs = job.getSubmitTime();
}
jobStartTimeMS -= baselineTimeMs;
jobFinishTimeMS -= baselineTimeMs;
if (jobStartTimeMS < 0) {
LOG.warn("Warning: reset job {} start time to 0.", oldJobId);
jobFinishTimeMS = jobFinishTimeMS - jobStartTimeMS;
jobStartTimeMS = 0;
}
increaseQueueAppNum(jobQueue);
List<ContainerSimulator> containerList = new ArrayList<>();
// mapper
for (LoggedTask mapTask : job.getMapTasks()) {
if (mapTask.getAttempts().size() == 0) {
throw new YarnException("Invalid map task, no attempt for a mapper!");
}
LoggedTaskAttempt taskAttempt =
mapTask.getAttempts().get(mapTask.getAttempts().size() - 1);
String hostname = taskAttempt.getHostName().getValue();
long containerLifeTime = taskAttempt.getFinishTime() -
taskAttempt.getStartTime();
containerList.add(
new ContainerSimulator(getDefaultContainerResource(),
containerLifeTime, hostname, DEFAULT_MAPPER_PRIORITY, "map"));
}
// reducer
for (LoggedTask reduceTask : job.getReduceTasks()) {
if (reduceTask.getAttempts().size() == 0) {
throw new YarnException(
"Invalid reduce task, no attempt for a reducer!");
}
LoggedTaskAttempt taskAttempt =
reduceTask.getAttempts().get(reduceTask.getAttempts().size() - 1);
String hostname = taskAttempt.getHostName().getValue();
long containerLifeTime = taskAttempt.getFinishTime() -
taskAttempt.getStartTime();
containerList.add(
new ContainerSimulator(getDefaultContainerResource(),
containerLifeTime, hostname, DEFAULT_REDUCER_PRIORITY, "reduce"));
}
// Only supports the default job type currently
runNewAM(SLSUtils.DEFAULT_JOB_TYPE, user, jobQueue, oldJobId,
jobStartTimeMS, jobFinishTimeMS, containerList,
getAMContainerResource(null));
}
private Resource getDefaultContainerResource() {
Resource getDefaultContainerResource() {
int containerMemory = getConf().getInt(SLSConfiguration.CONTAINER_MEMORY_MB,
SLSConfiguration.CONTAINER_MEMORY_MB_DEFAULT);
int containerVCores = getConf().getInt(SLSConfiguration.CONTAINER_VCORES,
@ -726,94 +540,26 @@ public class SLSRunner extends Configured implements Tool {
private void startAMFromSynthGenerator() throws YarnException, IOException {
Configuration localConf = new Configuration();
localConf.set("fs.defaultFS", "file:///");
long baselineTimeMS = 0;
// if we use the nodeFile this could have been not initialized yet.
if (stjp == null) {
stjp = new SynthTraceJobProducer(getConf(), new Path(inputTraces[0]));
}
SynthJob job = null;
SynthJob job;
// we use stjp, a reference to the job producer instantiated during node
// creation
while ((job = (SynthJob) stjp.getNextJob()) != null) {
// only support MapReduce currently
String user = job.getUser() == null ? DEFAULT_USER :
job.getUser();
String jobQueue = job.getQueueName();
String oldJobId = job.getJobID().toString();
long jobStartTimeMS = job.getSubmissionTime();
// CARLO: Finish time is only used for logging, omit for now
long jobFinishTimeMS = jobStartTimeMS + job.getDuration();
if (baselineTimeMS == 0) {
baselineTimeMS = jobStartTimeMS;
}
jobStartTimeMS -= baselineTimeMS;
jobFinishTimeMS -= baselineTimeMS;
if (jobStartTimeMS < 0) {
LOG.warn("Warning: reset job {} start time to 0.", oldJobId);
jobFinishTimeMS = jobFinishTimeMS - jobStartTimeMS;
jobStartTimeMS = 0;
}
increaseQueueAppNum(jobQueue);
List<ContainerSimulator> containerList =
new ArrayList<ContainerSimulator>();
ArrayList<NodeId> keyAsArray = new ArrayList<NodeId>(nmMap.keySet());
Random rand = new Random(stjp.getSeed());
for (SynthJob.SynthTask task : job.getTasks()) {
RMNode node = nmMap.get(keyAsArray.get(rand.nextInt(keyAsArray.size())))
.getNode();
String hostname = "/" + node.getRackName() + "/" + node.getHostName();
long containerLifeTime = task.getTime();
Resource containerResource = Resource
.newInstance((int) task.getMemory(), (int) task.getVcores());
containerList.add(
new ContainerSimulator(containerResource, containerLifeTime,
hostname, task.getPriority(), task.getType(),
task.getExecutionType()));
}
ReservationId reservationId = null;
if(job.hasDeadline()){
if (job.hasDeadline()) {
reservationId = ReservationId
.newInstance(this.rm.getStartTime(), AM_ID);
.newInstance(rm.getStartTime(), AM_ID);
}
runNewAM(job.getType(), user, jobQueue, oldJobId,
jobStartTimeMS, jobFinishTimeMS, containerList, reservationId,
job.getDeadline(), getAMContainerResource(null), null,
job.getParams());
AMDefinitionSynth amDef = AMDefinitionFactory.createFromSynth(job, this);
startAMs(amDef, reservationId, job.getParams(), job.getDeadline());
}
}
private Resource getAMContainerResource(Map jsonJob) {
Resource amContainerResource =
SLSConfiguration.getAMContainerResource(getConf());
if (jsonJob == null) {
return amContainerResource;
}
ResourceInformation[] infors = ResourceUtils.getResourceTypesArray();
for (ResourceInformation info : infors) {
String key = SLSConfiguration.JOB_AM_PREFIX + info.getName();
if (jsonJob.containsKey(key)) {
long value = Long.parseLong(jsonJob.get(key).toString());
amContainerResource.setResourceValue(info.getName(), value);
}
}
return amContainerResource;
}
private void increaseQueueAppNum(String queue) throws YarnException {
void increaseQueueAppNum(String queue) throws YarnException {
SchedulerWrapper wrapper = (SchedulerWrapper)rm.getResourceScheduler();
String queueName = wrapper.getRealQueueName(queue);
Integer appNum = queueAppNumMap.get(queueName);
@ -830,32 +576,16 @@ public class SLSRunner extends Configured implements Tool {
}
}
private void runNewAM(String jobType, String user,
String jobQueue, String oldJobId, long jobStartTimeMS,
long jobFinishTimeMS, List<ContainerSimulator> containerList,
Resource amContainerResource) {
runNewAM(jobType, user, jobQueue, oldJobId, jobStartTimeMS,
jobFinishTimeMS, containerList, null, -1,
amContainerResource, null, null);
}
private void runNewAM(String jobType, String user,
String jobQueue, String oldJobId, long jobStartTimeMS,
long jobFinishTimeMS, List<ContainerSimulator> containerList,
Resource amContainerResource, String labelExpr) {
runNewAM(jobType, user, jobQueue, oldJobId, jobStartTimeMS,
jobFinishTimeMS, containerList, null, -1,
amContainerResource, labelExpr, null);
}
@SuppressWarnings("checkstyle:parameternumber")
private void runNewAM(String jobType, String user,
String jobQueue, String oldJobId, long jobStartTimeMS,
long jobFinishTimeMS, List<ContainerSimulator> containerList,
ReservationId reservationId, long deadline, Resource amContainerResource,
String labelExpr, Map<String, String> params) {
AMSimulator amSim = (AMSimulator) ReflectionUtils.newInstance(
private AMSimulator createAmSimulator(String jobType) {
return (AMSimulator) ReflectionUtils.newInstance(
amClassMap.get(jobType), new Configuration());
}
private void runNewAM(JobDefinition jobDef) {
AMDefinition amDef = jobDef.getAmDefinition();
String oldJobId = amDef.getOldAppId();
AMSimulator amSim =
createAmSimulator(amDef.getAmType());
if (amSim != null) {
int heartbeatInterval = getConf().getInt(
@ -867,19 +597,17 @@ public class SLSRunner extends Configured implements Tool {
oldJobId = Integer.toString(AM_ID);
}
AM_ID++;
amSim.init(heartbeatInterval, containerList, rm, this, jobStartTimeMS,
jobFinishTimeMS, user, jobQueue, isTracked, oldJobId,
runner.getStartTimeMS(), amContainerResource, labelExpr, params,
appIdAMSim);
if(reservationId != null) {
amSim.init(amDef, rm, this, isTracked, runner.getStartTimeMS(), heartbeatInterval, appIdAMSim);
if (jobDef.getReservationId() != null) {
// if we have a ReservationId, delegate reservation creation to
// AMSim (reservation shape is impl specific)
UTCClock clock = new UTCClock();
amSim.initReservation(reservationId, deadline, clock.getTime());
amSim.initReservation(jobDef.getReservationId(), jobDef.getDeadline(),
clock.getTime());
}
runner.schedule(amSim);
maxRuntime = Math.max(maxRuntime, jobFinishTimeMS);
numTasks += containerList.size();
maxRuntime = Math.max(maxRuntime, amDef.getJobFinishTime());
numTasks += amDef.getTaskContainers().size();
amMap.put(oldJobId, amSim);
}
}
@ -1121,4 +849,12 @@ public class SLSRunner extends Configured implements Tool {
return result;
}
}
public ResourceManager getRm() {
return rm;
}
public SynthTraceJobProducer getStjp() {
return stjp;
}
}

View File

@ -0,0 +1,248 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.sls;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.exceptions.YarnException;
import java.util.Map;
import static org.apache.hadoop.yarn.sls.AMDefinitionRumen.DEFAULT_MAPPER_PRIORITY;
public class TaskContainerDefinition {
private long duration;
private Resource resource;
private int priority;
private String type;
private int count;
private ExecutionType executionType;
private long allocationId = -1;
private long requestDelay = 0;
private String hostname;
public long getDuration() {
return duration;
}
public Resource getResource() {
return resource;
}
public int getPriority() {
return priority;
}
public String getType() {
return type;
}
public int getCount() {
return count;
}
public ExecutionType getExecutionType() {
return executionType;
}
public long getAllocationId() {
return allocationId;
}
public long getRequestDelay() {
return requestDelay;
}
public String getHostname() {
return hostname;
}
public static final class Builder {
private long duration = -1;
private long durationLegacy = -1;
private long taskStart = -1;
private long taskFinish = -1;
private Resource resource;
private int priority = DEFAULT_MAPPER_PRIORITY;
private String type = "map";
private int count = 1;
private ExecutionType executionType = ExecutionType.GUARANTEED;
private long allocationId = -1;
private long requestDelay = 0;
private String hostname;
public static Builder create() {
return new Builder();
}
public Builder withDuration(Map<?, ?> jsonTask, String key) {
if (jsonTask.containsKey(key)) {
this.duration = Integer.parseInt(jsonTask.get(key).toString());
}
return this;
}
public Builder withDuration(long duration) {
this.duration = duration;
return this;
}
/**
* Also support "duration.ms" for backward compatibility.
* @param jsonTask the json representation of the task.
* @param key The json key.
* @return the builder
*/
public Builder withDurationLegacy(Map<?, ?> jsonTask, String key) {
if (jsonTask.containsKey(key)) {
this.durationLegacy = Integer.parseInt(jsonTask.get(key).toString());
}
return this;
}
public Builder withTaskStart(Map<?, ?> jsonTask, String key) {
if (jsonTask.containsKey(key)) {
this.taskStart = Long.parseLong(jsonTask.get(key).toString());
}
return this;
}
public Builder withTaskFinish(Map<?, ?> jsonTask, String key) {
if (jsonTask.containsKey(key)) {
this.taskFinish = Long.parseLong(jsonTask.get(key).toString());
}
return this;
}
public Builder withResource(Resource resource) {
this.resource = resource;
return this;
}
public Builder withPriority(Map<?, ?> jsonTask, String key) {
if (jsonTask.containsKey(key)) {
this.priority = Integer.parseInt(jsonTask.get(key).toString());
}
return this;
}
public Builder withPriority(int priority) {
this.priority = priority;
return this;
}
public Builder withType(Map<?, ?> jsonTask, String key) {
if (jsonTask.containsKey(key)) {
this.type = jsonTask.get(key).toString();
}
return this;
}
public Builder withType(String type) {
this.type = type;
return this;
}
public Builder withCount(Map<?, ?> jsonTask, String key) {
if (jsonTask.containsKey(key)) {
count = Integer.parseInt(jsonTask.get(key).toString());
count = Math.max(count, 1);
}
return this;
}
public Builder withCount(int count) {
this.count = count;
return this;
}
public Builder withExecutionType(Map<?, ?> jsonTask, String key) {
if (jsonTask.containsKey(key)) {
this.executionType = ExecutionType.valueOf(
jsonTask.get(key).toString());
}
return this;
}
public Builder withExecutionType(ExecutionType executionType) {
this.executionType = executionType;
return this;
}
public Builder withAllocationId(Map<?, ?> jsonTask, String key) {
if (jsonTask.containsKey(key)) {
this.allocationId = Long.parseLong(jsonTask.get(key).toString());
}
return this;
}
public Builder withAllocationId(long allocationId) {
this.allocationId = allocationId;
return this;
}
public Builder withRequestDelay(Map<?, ?> jsonTask, String key) {
if (jsonTask.containsKey(key)) {
requestDelay = Long.parseLong(jsonTask.get(key).toString());
requestDelay = Math.max(requestDelay, 0);
}
return this;
}
public Builder withRequestDelay(long requestDelay) {
this.requestDelay = requestDelay;
return this;
}
public Builder withHostname(String hostname) {
this.hostname = hostname;
return this;
}
public TaskContainerDefinition build() throws YarnException {
TaskContainerDefinition taskContainerDef =
new TaskContainerDefinition();
taskContainerDef.duration = validateAndGetDuration(this);
taskContainerDef.resource = this.resource;
taskContainerDef.type = this.type;
taskContainerDef.requestDelay = this.requestDelay;
taskContainerDef.priority = this.priority;
taskContainerDef.count = this.count;
taskContainerDef.allocationId = this.allocationId;
taskContainerDef.executionType = this.executionType;
taskContainerDef.hostname = this.hostname;
return taskContainerDef;
}
private long validateAndGetDuration(Builder builder) throws YarnException {
long duration = 0;
if (builder.duration != -1) {
duration = builder.duration;
} else if (builder.durationLegacy != -1) {
duration = builder.durationLegacy;
} else if (builder.taskStart != -1 && builder.taskFinish != -1) {
duration = builder.taskFinish - builder.taskStart;
}
if (duration <= 0) {
throw new YarnException("Duration of a task shouldn't be less or equal"
+ " to 0!");
}
return duration;
}
}
}

View File

@ -62,6 +62,7 @@ import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.sls.AMDefinition;
import org.apache.hadoop.yarn.sls.scheduler.SchedulerMetrics;
import org.apache.hadoop.yarn.util.Records;
import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator;
@ -128,27 +129,25 @@ public abstract class AMSimulator extends TaskRunner.Task {
this.responseQueue = new LinkedBlockingQueue<>();
}
@SuppressWarnings("checkstyle:parameternumber")
public void init(int heartbeatInterval,
List<ContainerSimulator> containerList, ResourceManager resourceManager,
SLSRunner slsRunnner, long startTime, long finishTime, String simUser,
String simQueue, boolean tracked, String oldApp, long baseTimeMS,
Resource amResource, String nodeLabelExpr, Map<String, String> params,
Map<ApplicationId, AMSimulator> appIdAMSim) {
super.init(startTime, startTime + 1000000L * heartbeatInterval,
heartbeatInterval);
this.user = simUser;
this.rm = resourceManager;
this.se = slsRunnner;
this.queue = simQueue;
this.oldAppId = oldApp;
public void init(AMDefinition amDef, ResourceManager rm, SLSRunner slsRunner,
boolean tracked, long baselineTimeMS, long heartbeatInterval,
Map<ApplicationId, AMSimulator> appIdToAMSim) {
long startTime = amDef.getJobStartTime();
long endTime = startTime + 1000000L * heartbeatInterval;
super.init(startTime, endTime, heartbeatInterval);
this.user = amDef.getUser();
this.queue = amDef.getQueue();
this.oldAppId = amDef.getOldAppId();
this.amContainerResource = amDef.getAmResource();
this.nodeLabelExpression = amDef.getLabelExpression();
this.traceStartTimeMS = amDef.getJobStartTime();
this.traceFinishTimeMS = amDef.getJobFinishTime();
this.rm = rm;
this.se = slsRunner;
this.isTracked = tracked;
this.baselineTimeMS = baseTimeMS;
this.traceStartTimeMS = startTime;
this.traceFinishTimeMS = finishTime;
this.amContainerResource = amResource;
this.nodeLabelExpression = nodeLabelExpr;
this.appIdToAMSim = appIdAMSim;
this.baselineTimeMS = baselineTimeMS;
this.appIdToAMSim = appIdToAMSim;
}
/**

View File

@ -32,10 +32,10 @@ import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.sls.AMDefinition;
import org.apache.hadoop.yarn.sls.SLSRunner;
import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator;
import org.slf4j.Logger;
@ -93,19 +93,15 @@ public class DAGAMSimulator extends AMSimulator {
LoggerFactory.getLogger(DAGAMSimulator.class);
@SuppressWarnings("checkstyle:parameternumber")
public void init(int heartbeatInterval,
List<ContainerSimulator> containerList, ResourceManager resourceManager,
SLSRunner slsRunnner, long startTime, long finishTime, String simUser,
String simQueue, boolean tracked, String oldApp, long baseTimeMS,
Resource amResource, String nodeLabelExpr, Map<String, String> params,
Map<ApplicationId, AMSimulator> appIdAMSim) {
super.init(heartbeatInterval, containerList, resourceManager, slsRunnner,
startTime, finishTime, simUser, simQueue, tracked, oldApp, baseTimeMS,
amResource, nodeLabelExpr, params, appIdAMSim);
public void init(AMDefinition amDef, ResourceManager rm, SLSRunner slsRunner,
boolean tracked, long baselineTimeMS, long heartbeatInterval,
Map<ApplicationId, AMSimulator> appIdToAMSim) {
super.init(amDef, rm, slsRunner, tracked, baselineTimeMS, heartbeatInterval,
appIdToAMSim);
super.amtype = "dag";
allContainers.addAll(containerList);
pendingContainers.addAll(containerList);
allContainers.addAll(amDef.getTaskContainers());
pendingContainers.addAll(amDef.getTaskContainers());
totalContainers = allContainers.size();
LOG.info("Added new job with {} containers", allContainers.size());

View File

@ -45,6 +45,7 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.sls.AMDefinition;
import org.apache.hadoop.yarn.sls.ReservationClientUtil;
import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator;
import org.apache.hadoop.yarn.sls.SLSRunner;
@ -123,19 +124,15 @@ public class MRAMSimulator extends AMSimulator {
LoggerFactory.getLogger(MRAMSimulator.class);
@SuppressWarnings("checkstyle:parameternumber")
public void init(int heartbeatInterval,
List<ContainerSimulator> containerList, ResourceManager rm, SLSRunner se,
long traceStartTime, long traceFinishTime, String user, String queue,
boolean isTracked, String oldAppId, long baselineStartTimeMS,
Resource amContainerResource, String nodeLabelExpr,
Map<String, String> params, Map<ApplicationId, AMSimulator> appIdAMSim) {
super.init(heartbeatInterval, containerList, rm, se, traceStartTime,
traceFinishTime, user, queue, isTracked, oldAppId, baselineStartTimeMS,
amContainerResource, nodeLabelExpr, params, appIdAMSim);
public void init(AMDefinition amDef, ResourceManager rm, SLSRunner slsRunner,
boolean tracked, long baselineTimeMS, long heartbeatInterval,
Map<ApplicationId, AMSimulator> appIdToAMSim) {
super.init(amDef, rm, slsRunner, tracked, baselineTimeMS,
heartbeatInterval, appIdToAMSim);
amtype = "mapreduce";
// get map/reduce tasks
for (ContainerSimulator cs : containerList) {
for (ContainerSimulator cs : amDef.getTaskContainers()) {
if (cs.getType().equals("map")) {
cs.setPriority(PRIORITY_MAP);
allMaps.add(cs);

View File

@ -30,11 +30,11 @@ import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.sls.AMDefinition;
import org.apache.hadoop.yarn.sls.SLSRunner;
import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator;
import org.slf4j.Logger;
@ -93,21 +93,14 @@ public class StreamAMSimulator extends AMSimulator {
LoggerFactory.getLogger(StreamAMSimulator.class);
@SuppressWarnings("checkstyle:parameternumber")
public void init(int heartbeatInterval,
List<ContainerSimulator> containerList, ResourceManager rm, SLSRunner se,
long traceStartTime, long traceFinishTime, String user, String queue,
boolean isTracked, String oldAppId, long baselineStartTimeMS,
Resource amContainerResource, String nodeLabelExpr,
Map<String, String> params, Map<ApplicationId, AMSimulator> appIdAMSim) {
super.init(heartbeatInterval, containerList, rm, se, traceStartTime,
traceFinishTime, user, queue, isTracked, oldAppId, baselineStartTimeMS,
amContainerResource, nodeLabelExpr, params, appIdAMSim);
public void init(AMDefinition amDef, ResourceManager rm, SLSRunner slsRunner,
boolean tracked, long baselineTimeMS, long heartbeatInterval,
Map<ApplicationId, AMSimulator> appIdToAMSim) {
super.init(amDef, rm, slsRunner, tracked, baselineTimeMS,
heartbeatInterval, appIdToAMSim);
amtype = "stream";
allStreams.addAll(containerList);
duration = traceFinishTime - traceStartTime;
allStreams.addAll(amDef.getTaskContainers());
duration = amDef.getJobFinishTime() - amDef.getJobStartTime();
LOG.info("Added new job with {} streams, running for {}",
allStreams.size(), duration);
}

View File

@ -26,54 +26,41 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.sls.TaskContainerDefinition;
@Private
@Unstable
public class ContainerSimulator implements Delayed {
// id
private ContainerId id;
// resource allocated
private Resource resource;
// end time
private long endTime;
// life time (ms)
private long lifeTime;
// time(ms) after which container would be requested by AM
private long requestDelay;
// host name
private String hostname;
// priority
private int priority;
// type
private String type;
// execution type
private ExecutionType executionType = ExecutionType.GUARANTEED;
// allocation id
private long allocationId;
/**
* invoked when AM schedules containers to allocate.
* Invoked when AM schedules containers to allocate.
* @param def The task's definition object.
* @return ContainerSimulator object
*/
public ContainerSimulator(Resource resource, long lifeTime,
String hostname, int priority, String type) {
this(resource, lifeTime, hostname, priority, type,
ExecutionType.GUARANTEED);
public static ContainerSimulator createFromTaskContainerDefinition(
TaskContainerDefinition def) {
return new ContainerSimulator(def.getResource(), def.getDuration(),
def.getHostname(), def.getPriority(), def.getType(),
def.getExecutionType(), def.getAllocationId(), def.getRequestDelay());
}
/**
* invoked when AM schedules containers to allocate.
*/
public ContainerSimulator(Resource resource, long lifeTime,
String hostname, int priority, String type, ExecutionType executionType) {
this(resource, lifeTime, hostname, priority, type,
executionType, -1, 0);
}
/**
* invoked when AM schedules containers to allocate.
* Invoked when AM schedules containers to allocate.
*/
@SuppressWarnings("checkstyle:parameternumber")
public ContainerSimulator(Resource resource, long lifeTime,
private ContainerSimulator(Resource resource, long lifeTime,
String hostname, int priority, String type, ExecutionType executionType,
long allocationId, long requestDelay) {
this.resource = resource;
@ -87,7 +74,7 @@ public class ContainerSimulator implements Delayed {
}
/**
* invoke when NM schedules containers to run.
* Invoked when NM schedules containers to run.
*/
public ContainerSimulator(ContainerId id, Resource resource, long endTime,
long lifeTime, long allocationId) {

View File

@ -57,8 +57,6 @@ import org.apache.hadoop.yarn.util.resource.Resources;
@Private
@Unstable
public class SLSUtils {
public final static String DEFAULT_JOB_TYPE = "mapreduce";
private static final String LABEL_FORMAT_ERR_MSG =
"Input format for adding node-labels is not correct, it should be "
+ "labelName1[(exclusive=true/false)],labelName2[] ..";

View File

@ -26,6 +26,8 @@ import java.util.ArrayList;
import java.util.List;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
/**
* Tests for DagAMSimulator.
@ -74,7 +76,17 @@ public class TestDagAMSimulator {
private ContainerSimulator createContainerSim(long allocationId,
long requestDelay) {
return new ContainerSimulator(null, 1000, "*", 1, "Map",
null, allocationId, requestDelay);
TaskContainerDefinition taskContainerDef =
mock(TaskContainerDefinition.class);
when(taskContainerDef.getResource()).thenReturn(null);
when(taskContainerDef.getDuration()).thenReturn(1000L);
when(taskContainerDef.getHostname()).thenReturn("*");
when(taskContainerDef.getPriority()).thenReturn(1);
when(taskContainerDef.getType()).thenReturn("Map");
when(taskContainerDef.getExecutionType()).thenReturn(null);
when(taskContainerDef.getAllocationId()).thenReturn(allocationId);
when(taskContainerDef.getRequestDelay()).thenReturn(requestDelay);
return ContainerSimulator.createFromTaskContainerDefinition(
taskContainerDef);
}
}

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.sls.appmaster;
import com.codahale.metrics.MetricRegistry;
import java.util.HashMap;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.tools.rumen.datatypes.UserName;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.NodeId;
@ -33,6 +34,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
import org.apache.hadoop.yarn.sls.AMDefinitionRumen;
import org.apache.hadoop.yarn.sls.TaskContainerDefinition;
import org.apache.hadoop.yarn.sls.SLSRunner;
import org.apache.hadoop.yarn.sls.conf.SLSConfiguration;
import org.apache.hadoop.yarn.sls.nodemanager.NMSimulator;
@ -57,6 +60,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@RunWith(Parameterized.class)
@ -157,9 +161,20 @@ public class TestAMSimulator {
String queue = "default";
List<ContainerSimulator> containers = new ArrayList<>();
HashMap<ApplicationId, AMSimulator> map = new HashMap<>();
app.init(1000, containers, rm, null, 0, 1000000L, "user1", queue, true,
appId, 0, SLSConfiguration.getAMContainerResource(conf), null, null,
map);
UserName mockUser = mock(UserName.class);
when(mockUser.getValue()).thenReturn("user1");
AMDefinitionRumen amDef =
AMDefinitionRumen.Builder.create()
.withUser(mockUser)
.withQueue(queue)
.withJobId(appId)
.withJobStartTime(0)
.withJobFinishTime(1000000L)
.withAmResource(SLSConfiguration.getAMContainerResource(conf))
.withTaskContainers(containers)
.build();
app.init(amDef, rm, null, true, 0, 1000, map);
app.firstStep();
verifySchedulerMetrics(appId);
@ -184,9 +199,21 @@ public class TestAMSimulator {
String queue = "default";
List<ContainerSimulator> containers = new ArrayList<>();
HashMap<ApplicationId, AMSimulator> map = new HashMap<>();
app.init(1000, containers, rm, null, 0, 1000000L, "user1", queue, true,
appId, 0, SLSConfiguration.getAMContainerResource(conf), "label1",
null, map);
UserName mockUser = mock(UserName.class);
when(mockUser.getValue()).thenReturn("user1");
AMDefinitionRumen amDef =
AMDefinitionRumen.Builder.create()
.withUser(mockUser)
.withQueue(queue)
.withJobId(appId)
.withJobStartTime(0)
.withJobFinishTime(1000000L)
.withAmResource(SLSConfiguration.getAMContainerResource(conf))
.withTaskContainers(containers)
.withLabelExpression("label1")
.build();
app.init(amDef, rm, null, true, 0, 1000, map);
app.firstStep();
verifySchedulerMetrics(appId);
@ -201,7 +228,7 @@ public class TestAMSimulator {
}
@Test
public void testPackageRequests() {
public void testPackageRequests() throws YarnException {
MockAMSimulator app = new MockAMSimulator();
List<ContainerSimulator> containerSimulators = new ArrayList<>();
Resource resource = Resources.createResource(1024);
@ -209,12 +236,25 @@ public class TestAMSimulator {
ExecutionType execType = ExecutionType.GUARANTEED;
String type = "map";
ContainerSimulator s1 = new ContainerSimulator(resource, 100,
"/default-rack/h1", priority, type, execType);
ContainerSimulator s2 = new ContainerSimulator(resource, 100,
"/default-rack/h1", priority, type, execType);
ContainerSimulator s3 = new ContainerSimulator(resource, 100,
"/default-rack/h2", priority, type, execType);
TaskContainerDefinition.Builder builder =
TaskContainerDefinition.Builder.create()
.withResource(resource)
.withDuration(100)
.withPriority(1)
.withType(type)
.withExecutionType(execType)
.withAllocationId(-1)
.withRequestDelay(0);
ContainerSimulator s1 = ContainerSimulator
.createFromTaskContainerDefinition(
builder.withHostname("/default-rack/h1").build());
ContainerSimulator s2 = ContainerSimulator
.createFromTaskContainerDefinition(
builder.withHostname("/default-rack/h1").build());
ContainerSimulator s3 = ContainerSimulator
.createFromTaskContainerDefinition(
builder.withHostname("/default-rack/h2").build());
containerSimulators.add(s1);
containerSimulators.add(s2);
@ -250,12 +290,15 @@ public class TestAMSimulator {
Assert.assertEquals(2, nodeRequestCount);
containerSimulators.clear();
s1 = new ContainerSimulator(resource, 100,
"/default-rack/h1", priority, type, execType, 1, 0);
s2 = new ContainerSimulator(resource, 100,
"/default-rack/h1", priority, type, execType, 2, 0);
s3 = new ContainerSimulator(resource, 100,
"/default-rack/h2", priority, type, execType, 1, 0);
s1 = ContainerSimulator.createFromTaskContainerDefinition(
createDefaultTaskContainerDefMock(resource, priority, execType, type,
"/default-rack/h1", 1));
s2 = ContainerSimulator.createFromTaskContainerDefinition(
createDefaultTaskContainerDefMock(resource, priority, execType, type,
"/default-rack/h1", 2));
s3 = ContainerSimulator.createFromTaskContainerDefinition(
createDefaultTaskContainerDefMock(resource, priority, execType, type,
"/default-rack/h2", 1));
containerSimulators.add(s1);
containerSimulators.add(s2);
@ -317,6 +360,20 @@ public class TestAMSimulator {
Assert.assertFalse(nm.getNode().getRunningApps().contains(app.appId));
Assert.assertTrue(nm.getNode().getRunningApps().isEmpty());
}
private TaskContainerDefinition createDefaultTaskContainerDefMock(
Resource resource, int priority, ExecutionType execType, String type,
String hostname, long allocationId) {
TaskContainerDefinition taskContainerDef =
mock(TaskContainerDefinition.class);
when(taskContainerDef.getResource()).thenReturn(resource);
when(taskContainerDef.getDuration()).thenReturn(100L);
when(taskContainerDef.getPriority()).thenReturn(priority);
when(taskContainerDef.getType()).thenReturn(type);
when(taskContainerDef.getExecutionType()).thenReturn(execType);
when(taskContainerDef.getHostname()).thenReturn(hostname);
when(taskContainerDef.getAllocationId()).thenReturn(allocationId);
return taskContainerDef;
}
@After
public void tearDown() {