diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/AMDefinition.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/AMDefinition.java new file mode 100644 index 00000000000..1f9e351c429 --- /dev/null +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/AMDefinition.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.sls; + +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator; + +import java.util.List; + +public abstract class AMDefinition { + protected int jobCount; + protected String amType; + protected String user; + protected String queue; + protected long jobStartTime; + protected long jobFinishTime; + protected List taskContainers; + protected Resource amResource; + protected String labelExpression; + protected String oldAppId; + + public AMDefinition(AmDefinitionBuilder builder) { + this.jobStartTime = builder.jobStartTime; + this.jobFinishTime = builder.jobFinishTime; + this.amType = builder.amType; + this.taskContainers = builder.taskContainers; + this.labelExpression = builder.labelExpression; + this.user = builder.user; + this.amResource = builder.amResource; + this.queue = builder.queue; + this.jobCount = builder.jobCount; + this.oldAppId = builder.jobId; + } + + public String getAmType() { + return amType; + } + + public String getUser() { + return user; + } + + public String getOldAppId() { + return oldAppId; + } + + public long getJobStartTime() { + return jobStartTime; + } + + public long getJobFinishTime() { + return jobFinishTime; + } + + public List getTaskContainers() { + return taskContainers; + } + + public Resource getAmResource() { + return amResource; + } + + public String getLabelExpression() { + return labelExpression; + } + + public String getQueue() { + return queue; + } + + public int getJobCount() { + return jobCount; + } + + + public abstract static class AmDefinitionBuilder { + private static final String DEFAULT_USER = "default"; + + protected int jobCount = 1; + protected String amType = AMDefinitionFactory.DEFAULT_JOB_TYPE; + protected String user = DEFAULT_USER; + protected String queue; + protected String jobId; + protected long jobStartTime; + protected long jobFinishTime; + protected List taskContainers; + protected Resource amResource; + protected String labelExpression = null; + + } +} diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/AMDefinitionFactory.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/AMDefinitionFactory.java new file mode 100644 index 00000000000..2bbe7bb1ad1 --- /dev/null +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/AMDefinitionFactory.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.sls; + +import java.util.Map; + +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.tools.rumen.LoggedJob; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceInformation; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.sls.conf.SLSConfiguration; +import org.apache.hadoop.yarn.sls.synthetic.SynthJob; +import org.apache.hadoop.yarn.util.resource.ResourceUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public final class AMDefinitionFactory { + private static final Logger LOG = LoggerFactory.getLogger( + AMDefinitionFactory.class); + public final static String DEFAULT_JOB_TYPE = "mapreduce"; + + private AMDefinitionFactory() {} + + public static AMDefinitionSLS createFromSlsTrace(Map jsonJob, + SLSRunner slsRunner) throws YarnException { + AMDefinitionSLS amDefinition = AMDefinitionSLS.Builder.create(jsonJob) + .withAmType(SLSConfiguration.AM_TYPE) + .withAmResource(getAMContainerResourceSLS(jsonJob, slsRunner)) + .withTaskContainers( + AMDefinitionSLS.getTaskContainers(jsonJob, slsRunner)) + .withJobStartTime(SLSConfiguration.JOB_START_MS) + .withJobFinishTime(SLSConfiguration.JOB_END_MS) + .withLabelExpression(SLSConfiguration.JOB_LABEL_EXPR) + .withUser(SLSConfiguration.JOB_USER) + .withQueue(SLSConfiguration.JOB_QUEUE_NAME) + .withJobId(SLSConfiguration.JOB_ID) + .withJobCount(SLSConfiguration.JOB_COUNT) + .build(); + slsRunner.increaseQueueAppNum(amDefinition.getQueue()); + return amDefinition; + } + + public static AMDefinitionRumen createFromRumenTrace(LoggedJob job, + long baselineTimeMs, SLSRunner slsRunner) throws YarnException { + AMDefinitionRumen amDefinition = AMDefinitionRumen.Builder.create() + .withAmType(DEFAULT_JOB_TYPE) + .withAmResource(getAMContainerResourceSynthAndRumen(slsRunner)) + .withTaskContainers( + AMDefinitionRumen.getTaskContainers(job, slsRunner)) + .withJobStartTime(job.getSubmitTime()) + .withJobFinishTime(job.getFinishTime()) + .withBaseLineTimeMs(baselineTimeMs) + .withUser(job.getUser()) + .withQueue(job.getQueue().getValue()) + .withJobId(job.getJobID().toString()) + .build(); + slsRunner.increaseQueueAppNum(amDefinition.getQueue()); + return amDefinition; + } + + public static AMDefinitionSynth createFromSynth(SynthJob job, + SLSRunner slsRunner) throws YarnException { + AMDefinitionSynth amDefinition = + AMDefinitionSynth.Builder.create() + .withAmType(job.getType()) + .withAmResource(getAMContainerResourceSynthAndRumen(slsRunner)) + .withTaskContainers( + AMDefinitionSynth.getTaskContainers(job, slsRunner)) + .withUser(job.getUser()) + .withQueue(job.getQueueName()) + .withJobId(job.getJobID().toString()) + .withJobStartTime(job.getSubmissionTime()) + .withJobFinishTime(job.getSubmissionTime() + job.getDuration()) + .withBaseLineTimeMs(0) + .build(); + + slsRunner.increaseQueueAppNum(amDefinition.getQueue()); + return amDefinition; + } + + private static Resource getAMContainerResourceSLS(Map jsonJob, + Configured configured) { + Resource amContainerResource = + SLSConfiguration.getAMContainerResource(configured.getConf()); + if (jsonJob == null) { + return amContainerResource; + } + + ResourceInformation[] infors = ResourceUtils.getResourceTypesArray(); + for (ResourceInformation info : infors) { + String key = SLSConfiguration.JOB_AM_PREFIX + info.getName(); + if (jsonJob.containsKey(key)) { + long value = Long.parseLong(jsonJob.get(key).toString()); + amContainerResource.setResourceValue(info.getName(), value); + } + } + + return amContainerResource; + } + + private static Resource getAMContainerResourceSynthAndRumen( + Configured configured) { + return SLSConfiguration.getAMContainerResource(configured.getConf()); + } + + static void adjustTimeValuesToBaselineTime(AMDefinition amDef, + AMDefinition.AmDefinitionBuilder builder, long baselineTimeMs) { + builder.jobStartTime -= baselineTimeMs; + builder.jobFinishTime -= baselineTimeMs; + if (builder.jobStartTime < 0) { + LOG.warn("Warning: reset job {} start time to 0.", amDef.getOldAppId()); + builder.jobFinishTime = builder.jobFinishTime - builder.jobStartTime; + builder.jobStartTime = 0; + } + amDef.jobStartTime = builder.jobStartTime; + } +} diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/AMDefinitionRumen.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/AMDefinitionRumen.java new file mode 100644 index 00000000000..cc97a90ec00 --- /dev/null +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/AMDefinitionRumen.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.sls; + +import org.apache.hadoop.tools.rumen.LoggedJob; +import org.apache.hadoop.tools.rumen.LoggedTask; +import org.apache.hadoop.tools.rumen.LoggedTaskAttempt; +import org.apache.hadoop.tools.rumen.datatypes.UserName; +import org.apache.hadoop.yarn.api.records.ExecutionType; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator; +import java.util.ArrayList; +import java.util.List; + +import static org.apache.hadoop.yarn.sls.AMDefinitionFactory.adjustTimeValuesToBaselineTime; + +public class AMDefinitionRumen extends AMDefinition { + public final static int DEFAULT_MAPPER_PRIORITY = 20; + private final static int DEFAULT_REDUCER_PRIORITY = 10; + + public AMDefinitionRumen(AmDefinitionBuilder builder) { + super(builder); + } + + public static List getTaskContainers(LoggedJob job, + SLSRunner slsRunner) throws YarnException { + List containerList = new ArrayList<>(); + + TaskContainerDefinition.Builder builder = + TaskContainerDefinition.Builder.create() + .withCount(1) + .withResource(slsRunner.getDefaultContainerResource()) + .withExecutionType(ExecutionType.GUARANTEED) + .withAllocationId(-1) + .withRequestDelay(0); + + // mapper + for (LoggedTask mapTask : job.getMapTasks()) { + if (mapTask.getAttempts().size() == 0) { + throw new YarnException("Invalid map task, no attempt for a mapper!"); + } + LoggedTaskAttempt taskAttempt = + mapTask.getAttempts().get(mapTask.getAttempts().size() - 1); + TaskContainerDefinition containerDef = builder + .withHostname(taskAttempt.getHostName().getValue()) + .withDuration(taskAttempt.getFinishTime() - + taskAttempt.getStartTime()) + .withPriority(DEFAULT_MAPPER_PRIORITY) + .withType("map") + .build(); + containerList.add( + ContainerSimulator.createFromTaskContainerDefinition(containerDef)); + } + + // reducer + for (LoggedTask reduceTask : job.getReduceTasks()) { + if (reduceTask.getAttempts().size() == 0) { + throw new YarnException( + "Invalid reduce task, no attempt for a reducer!"); + } + LoggedTaskAttempt taskAttempt = + reduceTask.getAttempts().get(reduceTask.getAttempts().size() - 1); + TaskContainerDefinition containerDef = builder + .withHostname(taskAttempt.getHostName().getValue()) + .withDuration(taskAttempt.getFinishTime() - + taskAttempt.getStartTime()) + .withPriority(DEFAULT_REDUCER_PRIORITY) + .withType("reduce") + .build(); + containerList.add( + ContainerSimulator.createFromTaskContainerDefinition(containerDef)); + } + + return containerList; + } + + + public static final class Builder extends AmDefinitionBuilder { + private long baselineTimeMs; + + private Builder() { + } + + public static Builder create() { + return new Builder(); + } + + public Builder withAmType(String amType) { + this.amType = amType; + return this; + } + + public Builder withUser(UserName user) { + if (user != null) { + this.user = user.getValue(); + } + return this; + } + + public Builder withQueue(String queue) { + this.queue = queue; + return this; + } + + public Builder withJobId(String oldJobId) { + this.jobId = oldJobId; + return this; + } + + public Builder withJobStartTime(long time) { + this.jobStartTime = time; + return this; + } + + public Builder withJobFinishTime(long time) { + this.jobFinishTime = time; + return this; + } + + public Builder withBaseLineTimeMs(long baselineTimeMs) { + this.baselineTimeMs = baselineTimeMs; + return this; + } + + public Builder withLabelExpression(String expr) { + this.labelExpression = expr; + return this; + } + + public AMDefinitionRumen.Builder withTaskContainers( + List taskContainers) { + this.taskContainers = taskContainers; + return this; + } + + public AMDefinitionRumen.Builder withAmResource(Resource amResource) { + this.amResource = amResource; + return this; + } + + public AMDefinitionRumen build() { + AMDefinitionRumen amDef = new AMDefinitionRumen(this); + + if (baselineTimeMs == 0) { + baselineTimeMs = jobStartTime; + } + adjustTimeValuesToBaselineTime(amDef, this, baselineTimeMs); + return amDef; + } + + } +} diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/AMDefinitionSLS.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/AMDefinitionSLS.java new file mode 100644 index 00000000000..7439ddf8cee --- /dev/null +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/AMDefinitionSLS.java @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.sls; + +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceInformation; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.sls.conf.SLSConfiguration; +import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator; +import org.apache.hadoop.yarn.util.resource.ResourceUtils; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +public class AMDefinitionSLS extends AMDefinition { + public AMDefinitionSLS(AmDefinitionBuilder builder) { + super(builder); + } + + public String getQueue() { + return queue; + } + + public static List getTaskContainers(Map jsonJob, + SLSRunner slsRunner) throws YarnException { + List> tasks = (List) jsonJob.get(SLSConfiguration.JOB_TASKS); + if (tasks == null || tasks.size() == 0) { + throw new YarnException("No task for the job!"); + } + + List containers = new ArrayList<>(); + for (Map jsonTask : tasks) { + TaskContainerDefinition containerDef = + TaskContainerDefinition.Builder.create() + .withCount(jsonTask, SLSConfiguration.COUNT) + .withHostname((String) jsonTask.get(SLSConfiguration.TASK_HOST)) + .withDuration(jsonTask, SLSConfiguration.TASK_DURATION_MS) + .withDurationLegacy(jsonTask, SLSConfiguration.DURATION_MS) + .withTaskStart(jsonTask, SLSConfiguration.TASK_START_MS) + .withTaskFinish(jsonTask, SLSConfiguration.TASK_END_MS) + .withResource(getResourceForContainer(jsonTask, slsRunner)) + .withPriority(jsonTask, SLSConfiguration.TASK_PRIORITY) + .withType(jsonTask, SLSConfiguration.TASK_TYPE) + .withExecutionType(jsonTask, SLSConfiguration.TASK_EXECUTION_TYPE) + .withAllocationId(jsonTask, SLSConfiguration.TASK_ALLOCATION_ID) + .withRequestDelay(jsonTask, SLSConfiguration.TASK_REQUEST_DELAY) + .build(); + + for (int i = 0; i < containerDef.getCount(); i++) { + containers.add(ContainerSimulator. + createFromTaskContainerDefinition(containerDef)); + } + } + return containers; + } + + private static Resource getResourceForContainer(Map jsonTask, + SLSRunner slsRunner) { + Resource res = slsRunner.getDefaultContainerResource(); + ResourceInformation[] infors = ResourceUtils.getResourceTypesArray(); + for (ResourceInformation info : infors) { + if (jsonTask.containsKey(SLSConfiguration.TASK_PREFIX + info.getName())) { + long value = Long.parseLong( + jsonTask.get(SLSConfiguration.TASK_PREFIX + info.getName()) + .toString()); + res.setResourceValue(info.getName(), value); + } + } + return res; + } + + public static final class Builder extends AmDefinitionBuilder { + private final Map jsonJob; + + private Builder(Map jsonJob) { + this.jsonJob = jsonJob; + } + + public static Builder create(Map jsonJob) { + return new Builder(jsonJob); + } + + public Builder withAmType(String key) { + if (jsonJob.containsKey(key)) { + String amType = (String) jsonJob.get(key); + if (amType != null) { + this.amType = amType; + } + } + return this; + } + + public Builder withUser(String key) { + if (jsonJob.containsKey(key)) { + String user = (String) jsonJob.get(key); + if (user != null) { + this.user = user; + } + } + return this; + } + + public Builder withQueue(String key) { + if (jsonJob.containsKey(key)) { + this.queue = jsonJob.get(key).toString(); + } + return this; + } + + public Builder withJobId(String key) { + if (jsonJob.containsKey(key)) { + this.jobId = (String) jsonJob.get(key); + } + return this; + } + + public Builder withJobCount(String key) { + if (jsonJob.containsKey(key)) { + jobCount = Integer.parseInt(jsonJob.get(key).toString()); + jobCount = Math.max(jobCount, 1); + } + return this; + } + + public Builder withJobStartTime(String key) { + if (jsonJob.containsKey(key)) { + this.jobStartTime = Long.parseLong(jsonJob.get(key).toString()); + } + return this; + } + + public Builder withJobFinishTime(String key) { + if (jsonJob.containsKey(key)) { + this.jobFinishTime = Long.parseLong(jsonJob.get(key).toString()); + } + return this; + } + + public Builder withLabelExpression(String key) { + if (jsonJob.containsKey(key)) { + this.labelExpression = jsonJob.get(key).toString(); + } + return this; + } + + public AMDefinitionSLS.Builder withTaskContainers( + List taskContainers) { + this.taskContainers = taskContainers; + return this; + } + + public AMDefinitionSLS.Builder withAmResource(Resource amResource) { + this.amResource = amResource; + return this; + } + + public AMDefinitionSLS build() { + AMDefinitionSLS amDef = new AMDefinitionSLS(this); + // Job id is generated automatically if this job configuration allows + // multiple job instances + if (jobCount > 1) { + amDef.oldAppId = null; + } else { + amDef.oldAppId = jobId; + } + amDef.jobCount = jobCount; + return amDef; + } + } + +} diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/AMDefinitionSynth.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/AMDefinitionSynth.java new file mode 100644 index 00000000000..db736f06f72 --- /dev/null +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/AMDefinitionSynth.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.sls; + +import static org.apache.hadoop.yarn.sls.AMDefinitionFactory.adjustTimeValuesToBaselineTime; + +import java.util.ArrayList; +import java.util.List; +import java.util.Random; + +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator; +import org.apache.hadoop.yarn.sls.synthetic.SynthJob; + +public class AMDefinitionSynth extends AMDefinition { + public AMDefinitionSynth(AmDefinitionBuilder builder) { + super(builder); + } + + public static List getTaskContainers( + SynthJob job, SLSRunner slsRunner) throws YarnException { + List containerList = new ArrayList<>(); + ArrayList keyAsArray = new ArrayList<>( + slsRunner.getNmMap().keySet()); + Random rand = new Random(slsRunner.getStjp().getSeed()); + + for (SynthJob.SynthTask task : job.getTasks()) { + RMNode node = getRandomNode(slsRunner, keyAsArray, rand); + TaskContainerDefinition containerDef = + TaskContainerDefinition.Builder.create() + .withCount(1) + .withHostname("/" + node.getRackName() + "/" + node.getHostName()) + .withDuration(task.getTime()) + .withResource(Resource + .newInstance((int) task.getMemory(), (int) task.getVcores())) + .withPriority(task.getPriority()) + .withType(task.getType()) + .withExecutionType(task.getExecutionType()) + .withAllocationId(-1) + .withRequestDelay(0) + .build(); + containerList.add( + ContainerSimulator.createFromTaskContainerDefinition(containerDef)); + } + + return containerList; + } + + private static RMNode getRandomNode(SLSRunner slsRunner, + ArrayList keyAsArray, Random rand) { + int randomIndex = rand.nextInt(keyAsArray.size()); + return slsRunner.getNmMap().get(keyAsArray.get(randomIndex)).getNode(); + } + + public static final class Builder extends AmDefinitionBuilder { + private long baselineTimeMs; + + private Builder() { + } + + public static Builder create() { + return new Builder(); + } + + public Builder withAmType(String amType) { + this.amType = amType; + return this; + } + + public Builder withUser(String user) { + if (user != null) { + this.user = user; + } + return this; + } + + public Builder withQueue(String queue) { + this.queue = queue; + return this; + } + + public Builder withJobId(String oldJobId) { + this.jobId = oldJobId; + return this; + } + + public Builder withJobStartTime(long time) { + this.jobStartTime = time; + return this; + } + + public Builder withJobFinishTime(long time) { + this.jobFinishTime = time; + return this; + } + + public Builder withBaseLineTimeMs(long baselineTimeMs) { + this.baselineTimeMs = baselineTimeMs; + return this; + } + + public AMDefinitionSynth.Builder withLabelExpression(String expr) { + this.labelExpression = expr; + return this; + } + + public AMDefinitionSynth.Builder withTaskContainers( + List taskContainers) { + this.taskContainers = taskContainers; + return this; + } + + public AMDefinitionSynth.Builder withAmResource(Resource amResource) { + this.amResource = amResource; + return this; + } + + public AMDefinitionSynth build() { + AMDefinitionSynth amDef = new AMDefinitionSynth(this); + + if (baselineTimeMs == 0) { + baselineTimeMs = jobStartTime; + } + adjustTimeValuesToBaselineTime(amDef, this, baselineTimeMs); + return amDef; + } + } + +} diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/JobDefinition.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/JobDefinition.java new file mode 100644 index 00000000000..4a39d3710c9 --- /dev/null +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/JobDefinition.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.sls; + +import org.apache.hadoop.yarn.api.records.ReservationId; +import java.util.Map; + +public class JobDefinition { + private AMDefinition amDefinition; + private ReservationId reservationId; + private long deadline; + private Map params; + + public AMDefinition getAmDefinition() { + return amDefinition; + } + + public ReservationId getReservationId() { + return reservationId; + } + + public long getDeadline() { + return deadline; + } + + //Currently unused + public Map getParams() { + return params; + } + + public static final class Builder { + private AMDefinition amDefinition; + private ReservationId reservationId; + private long deadline; + private Map params; + + private Builder() { + } + + public static Builder create() { + return new Builder(); + } + + public Builder withAmDefinition(AMDefinition amDefinition) { + this.amDefinition = amDefinition; + return this; + } + + public Builder withReservationId(ReservationId reservationId) { + this.reservationId = reservationId; + return this; + } + + public Builder withDeadline(long deadline) { + this.deadline = deadline; + return this; + } + + public Builder withParams(Map params) { + this.params = params; + return this; + } + + public JobDefinition build() { + JobDefinition jobDef = new JobDefinition(); + jobDef.params = this.params; + jobDef.amDefinition = this.amDefinition; + jobDef.reservationId = this.reservationId; + jobDef.deadline = this.deadline; + return jobDef; + } + } +} diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java index a8d2aa6584b..83834e8f9c9 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java @@ -23,12 +23,10 @@ import java.io.IOException; import java.io.InputStreamReader; import java.io.Reader; import java.security.Security; -import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; -import java.util.List; import java.util.Map; import java.util.Random; import java.util.Set; @@ -59,13 +57,10 @@ import org.apache.hadoop.net.DNSToSwitchMapping; import org.apache.hadoop.net.TableMapping; import org.apache.hadoop.tools.rumen.JobTraceReader; import org.apache.hadoop.tools.rumen.LoggedJob; -import org.apache.hadoop.tools.rumen.LoggedTask; -import org.apache.hadoop.tools.rumen.LoggedTaskAttempt; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeLabel; import org.apache.hadoop.yarn.api.records.NodeState; @@ -89,7 +84,6 @@ import org.apache.hadoop.yarn.sls.scheduler.SLSCapacityScheduler; import org.apache.hadoop.yarn.sls.scheduler.SchedulerMetrics; import org.apache.hadoop.yarn.sls.scheduler.TaskRunner; import org.apache.hadoop.yarn.sls.scheduler.SLSFairScheduler; -import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator; import org.apache.hadoop.yarn.sls.scheduler.SchedulerWrapper; import org.apache.hadoop.yarn.sls.synthetic.SynthJob; import org.apache.hadoop.yarn.sls.synthetic.SynthTraceJobProducer; @@ -138,13 +132,8 @@ public class SLSRunner extends Configured implements Tool { // logger public final static Logger LOG = LoggerFactory.getLogger(SLSRunner.class); - private final static int DEFAULT_MAPPER_PRIORITY = 20; - private final static int DEFAULT_REDUCER_PRIORITY = 10; - private static boolean exitAtTheFinish = false; - private static final String DEFAULT_USER = "default"; - /** * The type of trace in input. */ @@ -472,7 +461,10 @@ public class SLSRunner extends Configured implements Tool { while (jobIter.hasNext()) { try { - createAMForJob(jobIter.next()); + Map jsonJob = jobIter.next(); + AMDefinitionSLS amDef = AMDefinitionFactory.createFromSlsTrace( + jsonJob, this); + startAMs(amDef); } catch (Exception e) { LOG.error("Failed to create an AM: {}", e.getMessage()); } @@ -480,150 +472,29 @@ public class SLSRunner extends Configured implements Tool { } } - private void createAMForJob(Map jsonJob) throws YarnException { - long jobStartTime = Long.parseLong( - jsonJob.get(SLSConfiguration.JOB_START_MS).toString()); - - long jobFinishTime = 0; - if (jsonJob.containsKey(SLSConfiguration.JOB_END_MS)) { - jobFinishTime = Long.parseLong( - jsonJob.get(SLSConfiguration.JOB_END_MS).toString()); - } - - String jobLabelExpr = null; - if (jsonJob.containsKey(SLSConfiguration.JOB_LABEL_EXPR)) { - jobLabelExpr = jsonJob.get(SLSConfiguration.JOB_LABEL_EXPR).toString(); - } - - String user = (String) jsonJob.get(SLSConfiguration.JOB_USER); - if (user == null) { - user = "default"; - } - - String queue = jsonJob.get(SLSConfiguration.JOB_QUEUE_NAME).toString(); - increaseQueueAppNum(queue); - - String amType = (String)jsonJob.get(SLSConfiguration.AM_TYPE); - if (amType == null) { - amType = SLSUtils.DEFAULT_JOB_TYPE; - } - - int jobCount = 1; - if (jsonJob.containsKey(SLSConfiguration.JOB_COUNT)) { - jobCount = Integer.parseInt( - jsonJob.get(SLSConfiguration.JOB_COUNT).toString()); - } - jobCount = Math.max(jobCount, 1); - - String oldAppId = (String)jsonJob.get(SLSConfiguration.JOB_ID); - // Job id is generated automatically if this job configuration allows - // multiple job instances - if(jobCount > 1) { - oldAppId = null; - } - - for (int i = 0; i < jobCount; i++) { - runNewAM(amType, user, queue, oldAppId, jobStartTime, jobFinishTime, - getTaskContainers(jsonJob), getAMContainerResource(jsonJob), - jobLabelExpr); + private void startAMs(AMDefinition amDef) { + for (int i = 0; i < amDef.getJobCount(); i++) { + JobDefinition jobDef = JobDefinition.Builder.create() + .withAmDefinition(amDef) + .withDeadline(-1) + .withReservationId(null) + .withParams(null) + .build(); + runNewAM(jobDef); } } - private List getTaskContainers(Map jsonJob) - throws YarnException { - List containers = new ArrayList<>(); - List tasks = (List) jsonJob.get(SLSConfiguration.JOB_TASKS); - if (tasks == null || tasks.size() == 0) { - throw new YarnException("No task for the job!"); + private void startAMs(AMDefinition amDef, ReservationId reservationId, + Map params, long deadline) { + for (int i = 0; i < amDef.getJobCount(); i++) { + JobDefinition jobDef = JobDefinition.Builder.create() + .withAmDefinition(amDef) + .withReservationId(reservationId) + .withParams(params) + .withDeadline(deadline) + .build(); + runNewAM(jobDef); } - - for (Object o : tasks) { - Map jsonTask = (Map) o; - - String hostname = (String) jsonTask.get(SLSConfiguration.TASK_HOST); - - long duration = 0; - if (jsonTask.containsKey(SLSConfiguration.TASK_DURATION_MS)) { - duration = Integer.parseInt( - jsonTask.get(SLSConfiguration.TASK_DURATION_MS).toString()); - } else if (jsonTask.containsKey(SLSConfiguration.DURATION_MS)) { - // Also support "duration.ms" for backward compatibility - duration = Integer.parseInt( - jsonTask.get(SLSConfiguration.DURATION_MS).toString()); - } else if (jsonTask.containsKey(SLSConfiguration.TASK_START_MS) && - jsonTask.containsKey(SLSConfiguration.TASK_END_MS)) { - long taskStart = Long.parseLong( - jsonTask.get(SLSConfiguration.TASK_START_MS).toString()); - long taskFinish = Long.parseLong( - jsonTask.get(SLSConfiguration.TASK_END_MS).toString()); - duration = taskFinish - taskStart; - } - if (duration <= 0) { - throw new YarnException("Duration of a task shouldn't be less or equal" - + " to 0!"); - } - - Resource res = getResourceForContainer(jsonTask); - - int priority = DEFAULT_MAPPER_PRIORITY; - if (jsonTask.containsKey(SLSConfiguration.TASK_PRIORITY)) { - priority = Integer.parseInt( - jsonTask.get(SLSConfiguration.TASK_PRIORITY).toString()); - } - - String type = "map"; - if (jsonTask.containsKey(SLSConfiguration.TASK_TYPE)) { - type = jsonTask.get(SLSConfiguration.TASK_TYPE).toString(); - } - - int count = 1; - if (jsonTask.containsKey(SLSConfiguration.COUNT)) { - count = Integer.parseInt( - jsonTask.get(SLSConfiguration.COUNT).toString()); - } - count = Math.max(count, 1); - - ExecutionType executionType = ExecutionType.GUARANTEED; - if (jsonTask.containsKey(SLSConfiguration.TASK_EXECUTION_TYPE)) { - executionType = ExecutionType.valueOf( - jsonTask.get(SLSConfiguration.TASK_EXECUTION_TYPE).toString()); - } - long allocationId = -1; - if (jsonTask.containsKey(SLSConfiguration.TASK_ALLOCATION_ID)) { - allocationId = Long.parseLong( - jsonTask.get(SLSConfiguration.TASK_ALLOCATION_ID).toString()); - } - - long requestDelay = 0; - if (jsonTask.containsKey(SLSConfiguration.TASK_REQUEST_DELAY)) { - requestDelay = Long.parseLong( - jsonTask.get(SLSConfiguration.TASK_REQUEST_DELAY).toString()); - } - requestDelay = Math.max(requestDelay, 0); - - for (int i = 0; i < count; i++) { - containers.add( - new ContainerSimulator(res, duration, hostname, priority, type, - executionType, allocationId, requestDelay)); - } - } - - return containers; - } - - private Resource getResourceForContainer(Map jsonTask) { - Resource res = getDefaultContainerResource(); - ResourceInformation[] infors = ResourceUtils.getResourceTypesArray(); - for (ResourceInformation info : infors) { - if (jsonTask.containsKey(SLSConfiguration.TASK_PREFIX + info.getName())) { - long value = Long.parseLong( - jsonTask.get(SLSConfiguration.TASK_PREFIX + info.getName()) - .toString()); - res.setResourceValue(info.getName(), value); - } - } - - return res; } /** @@ -642,76 +513,19 @@ public class SLSRunner extends Configured implements Tool { while (job != null) { try { - createAMForJob(job, baselineTimeMS); + AMDefinitionRumen amDef = + AMDefinitionFactory.createFromRumenTrace(job, baselineTimeMS, + this); + startAMs(amDef); } catch (Exception e) { LOG.error("Failed to create an AM", e); } - job = reader.getNext(); } } } - private void createAMForJob(LoggedJob job, long baselineTimeMs) - throws YarnException { - String user = job.getUser() == null ? "default" : - job.getUser().getValue(); - String jobQueue = job.getQueue().getValue(); - String oldJobId = job.getJobID().toString(); - long jobStartTimeMS = job.getSubmitTime(); - long jobFinishTimeMS = job.getFinishTime(); - if (baselineTimeMs == 0) { - baselineTimeMs = job.getSubmitTime(); - } - jobStartTimeMS -= baselineTimeMs; - jobFinishTimeMS -= baselineTimeMs; - if (jobStartTimeMS < 0) { - LOG.warn("Warning: reset job {} start time to 0.", oldJobId); - jobFinishTimeMS = jobFinishTimeMS - jobStartTimeMS; - jobStartTimeMS = 0; - } - - increaseQueueAppNum(jobQueue); - - List containerList = new ArrayList<>(); - // mapper - for (LoggedTask mapTask : job.getMapTasks()) { - if (mapTask.getAttempts().size() == 0) { - throw new YarnException("Invalid map task, no attempt for a mapper!"); - } - LoggedTaskAttempt taskAttempt = - mapTask.getAttempts().get(mapTask.getAttempts().size() - 1); - String hostname = taskAttempt.getHostName().getValue(); - long containerLifeTime = taskAttempt.getFinishTime() - - taskAttempt.getStartTime(); - containerList.add( - new ContainerSimulator(getDefaultContainerResource(), - containerLifeTime, hostname, DEFAULT_MAPPER_PRIORITY, "map")); - } - - // reducer - for (LoggedTask reduceTask : job.getReduceTasks()) { - if (reduceTask.getAttempts().size() == 0) { - throw new YarnException( - "Invalid reduce task, no attempt for a reducer!"); - } - LoggedTaskAttempt taskAttempt = - reduceTask.getAttempts().get(reduceTask.getAttempts().size() - 1); - String hostname = taskAttempt.getHostName().getValue(); - long containerLifeTime = taskAttempt.getFinishTime() - - taskAttempt.getStartTime(); - containerList.add( - new ContainerSimulator(getDefaultContainerResource(), - containerLifeTime, hostname, DEFAULT_REDUCER_PRIORITY, "reduce")); - } - - // Only supports the default job type currently - runNewAM(SLSUtils.DEFAULT_JOB_TYPE, user, jobQueue, oldJobId, - jobStartTimeMS, jobFinishTimeMS, containerList, - getAMContainerResource(null)); - } - - private Resource getDefaultContainerResource() { + Resource getDefaultContainerResource() { int containerMemory = getConf().getInt(SLSConfiguration.CONTAINER_MEMORY_MB, SLSConfiguration.CONTAINER_MEMORY_MB_DEFAULT); int containerVCores = getConf().getInt(SLSConfiguration.CONTAINER_VCORES, @@ -726,94 +540,26 @@ public class SLSRunner extends Configured implements Tool { private void startAMFromSynthGenerator() throws YarnException, IOException { Configuration localConf = new Configuration(); localConf.set("fs.defaultFS", "file:///"); - long baselineTimeMS = 0; - // if we use the nodeFile this could have been not initialized yet. if (stjp == null) { stjp = new SynthTraceJobProducer(getConf(), new Path(inputTraces[0])); } - SynthJob job = null; + SynthJob job; // we use stjp, a reference to the job producer instantiated during node // creation while ((job = (SynthJob) stjp.getNextJob()) != null) { - // only support MapReduce currently - String user = job.getUser() == null ? DEFAULT_USER : - job.getUser(); - String jobQueue = job.getQueueName(); - String oldJobId = job.getJobID().toString(); - long jobStartTimeMS = job.getSubmissionTime(); - - // CARLO: Finish time is only used for logging, omit for now - long jobFinishTimeMS = jobStartTimeMS + job.getDuration(); - - if (baselineTimeMS == 0) { - baselineTimeMS = jobStartTimeMS; - } - jobStartTimeMS -= baselineTimeMS; - jobFinishTimeMS -= baselineTimeMS; - if (jobStartTimeMS < 0) { - LOG.warn("Warning: reset job {} start time to 0.", oldJobId); - jobFinishTimeMS = jobFinishTimeMS - jobStartTimeMS; - jobStartTimeMS = 0; - } - - increaseQueueAppNum(jobQueue); - - List containerList = - new ArrayList(); - ArrayList keyAsArray = new ArrayList(nmMap.keySet()); - Random rand = new Random(stjp.getSeed()); - - for (SynthJob.SynthTask task : job.getTasks()) { - RMNode node = nmMap.get(keyAsArray.get(rand.nextInt(keyAsArray.size()))) - .getNode(); - String hostname = "/" + node.getRackName() + "/" + node.getHostName(); - long containerLifeTime = task.getTime(); - Resource containerResource = Resource - .newInstance((int) task.getMemory(), (int) task.getVcores()); - containerList.add( - new ContainerSimulator(containerResource, containerLifeTime, - hostname, task.getPriority(), task.getType(), - task.getExecutionType())); - } - - ReservationId reservationId = null; - - if(job.hasDeadline()){ + if (job.hasDeadline()) { reservationId = ReservationId - .newInstance(this.rm.getStartTime(), AM_ID); + .newInstance(rm.getStartTime(), AM_ID); } - - runNewAM(job.getType(), user, jobQueue, oldJobId, - jobStartTimeMS, jobFinishTimeMS, containerList, reservationId, - job.getDeadline(), getAMContainerResource(null), null, - job.getParams()); + AMDefinitionSynth amDef = AMDefinitionFactory.createFromSynth(job, this); + startAMs(amDef, reservationId, job.getParams(), job.getDeadline()); } } - private Resource getAMContainerResource(Map jsonJob) { - Resource amContainerResource = - SLSConfiguration.getAMContainerResource(getConf()); - - if (jsonJob == null) { - return amContainerResource; - } - - ResourceInformation[] infors = ResourceUtils.getResourceTypesArray(); - for (ResourceInformation info : infors) { - String key = SLSConfiguration.JOB_AM_PREFIX + info.getName(); - if (jsonJob.containsKey(key)) { - long value = Long.parseLong(jsonJob.get(key).toString()); - amContainerResource.setResourceValue(info.getName(), value); - } - } - - return amContainerResource; - } - - private void increaseQueueAppNum(String queue) throws YarnException { + void increaseQueueAppNum(String queue) throws YarnException { SchedulerWrapper wrapper = (SchedulerWrapper)rm.getResourceScheduler(); String queueName = wrapper.getRealQueueName(queue); Integer appNum = queueAppNumMap.get(queueName); @@ -830,32 +576,16 @@ public class SLSRunner extends Configured implements Tool { } } - private void runNewAM(String jobType, String user, - String jobQueue, String oldJobId, long jobStartTimeMS, - long jobFinishTimeMS, List containerList, - Resource amContainerResource) { - runNewAM(jobType, user, jobQueue, oldJobId, jobStartTimeMS, - jobFinishTimeMS, containerList, null, -1, - amContainerResource, null, null); + private AMSimulator createAmSimulator(String jobType) { + return (AMSimulator) ReflectionUtils.newInstance( + amClassMap.get(jobType), new Configuration()); } - private void runNewAM(String jobType, String user, - String jobQueue, String oldJobId, long jobStartTimeMS, - long jobFinishTimeMS, List containerList, - Resource amContainerResource, String labelExpr) { - runNewAM(jobType, user, jobQueue, oldJobId, jobStartTimeMS, - jobFinishTimeMS, containerList, null, -1, - amContainerResource, labelExpr, null); - } - - @SuppressWarnings("checkstyle:parameternumber") - private void runNewAM(String jobType, String user, - String jobQueue, String oldJobId, long jobStartTimeMS, - long jobFinishTimeMS, List containerList, - ReservationId reservationId, long deadline, Resource amContainerResource, - String labelExpr, Map params) { - AMSimulator amSim = (AMSimulator) ReflectionUtils.newInstance( - amClassMap.get(jobType), new Configuration()); + private void runNewAM(JobDefinition jobDef) { + AMDefinition amDef = jobDef.getAmDefinition(); + String oldJobId = amDef.getOldAppId(); + AMSimulator amSim = + createAmSimulator(amDef.getAmType()); if (amSim != null) { int heartbeatInterval = getConf().getInt( @@ -867,19 +597,17 @@ public class SLSRunner extends Configured implements Tool { oldJobId = Integer.toString(AM_ID); } AM_ID++; - amSim.init(heartbeatInterval, containerList, rm, this, jobStartTimeMS, - jobFinishTimeMS, user, jobQueue, isTracked, oldJobId, - runner.getStartTimeMS(), amContainerResource, labelExpr, params, - appIdAMSim); - if(reservationId != null) { + amSim.init(amDef, rm, this, isTracked, runner.getStartTimeMS(), heartbeatInterval, appIdAMSim); + if (jobDef.getReservationId() != null) { // if we have a ReservationId, delegate reservation creation to // AMSim (reservation shape is impl specific) UTCClock clock = new UTCClock(); - amSim.initReservation(reservationId, deadline, clock.getTime()); + amSim.initReservation(jobDef.getReservationId(), jobDef.getDeadline(), + clock.getTime()); } runner.schedule(amSim); - maxRuntime = Math.max(maxRuntime, jobFinishTimeMS); - numTasks += containerList.size(); + maxRuntime = Math.max(maxRuntime, amDef.getJobFinishTime()); + numTasks += amDef.getTaskContainers().size(); amMap.put(oldJobId, amSim); } } @@ -1121,4 +849,12 @@ public class SLSRunner extends Configured implements Tool { return result; } } + + public ResourceManager getRm() { + return rm; + } + + public SynthTraceJobProducer getStjp() { + return stjp; + } } diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/TaskContainerDefinition.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/TaskContainerDefinition.java new file mode 100644 index 00000000000..1b0cd9003b6 --- /dev/null +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/TaskContainerDefinition.java @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.sls; + +import org.apache.hadoop.yarn.api.records.ExecutionType; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.exceptions.YarnException; +import java.util.Map; + +import static org.apache.hadoop.yarn.sls.AMDefinitionRumen.DEFAULT_MAPPER_PRIORITY; + +public class TaskContainerDefinition { + private long duration; + private Resource resource; + private int priority; + private String type; + private int count; + private ExecutionType executionType; + private long allocationId = -1; + private long requestDelay = 0; + private String hostname; + + public long getDuration() { + return duration; + } + + public Resource getResource() { + return resource; + } + + public int getPriority() { + return priority; + } + + public String getType() { + return type; + } + + public int getCount() { + return count; + } + + public ExecutionType getExecutionType() { + return executionType; + } + + public long getAllocationId() { + return allocationId; + } + + public long getRequestDelay() { + return requestDelay; + } + + public String getHostname() { + return hostname; + } + + public static final class Builder { + private long duration = -1; + private long durationLegacy = -1; + private long taskStart = -1; + private long taskFinish = -1; + private Resource resource; + private int priority = DEFAULT_MAPPER_PRIORITY; + private String type = "map"; + private int count = 1; + private ExecutionType executionType = ExecutionType.GUARANTEED; + private long allocationId = -1; + private long requestDelay = 0; + private String hostname; + + public static Builder create() { + return new Builder(); + } + + public Builder withDuration(Map jsonTask, String key) { + if (jsonTask.containsKey(key)) { + this.duration = Integer.parseInt(jsonTask.get(key).toString()); + } + return this; + } + + public Builder withDuration(long duration) { + this.duration = duration; + return this; + } + + /** + * Also support "duration.ms" for backward compatibility. + * @param jsonTask the json representation of the task. + * @param key The json key. + * @return the builder + */ + public Builder withDurationLegacy(Map jsonTask, String key) { + if (jsonTask.containsKey(key)) { + this.durationLegacy = Integer.parseInt(jsonTask.get(key).toString()); + } + return this; + } + + public Builder withTaskStart(Map jsonTask, String key) { + if (jsonTask.containsKey(key)) { + this.taskStart = Long.parseLong(jsonTask.get(key).toString()); + } + return this; + } + + public Builder withTaskFinish(Map jsonTask, String key) { + if (jsonTask.containsKey(key)) { + this.taskFinish = Long.parseLong(jsonTask.get(key).toString()); + } + return this; + } + + public Builder withResource(Resource resource) { + this.resource = resource; + return this; + } + + public Builder withPriority(Map jsonTask, String key) { + if (jsonTask.containsKey(key)) { + this.priority = Integer.parseInt(jsonTask.get(key).toString()); + } + return this; + } + + public Builder withPriority(int priority) { + this.priority = priority; + return this; + } + + public Builder withType(Map jsonTask, String key) { + if (jsonTask.containsKey(key)) { + this.type = jsonTask.get(key).toString(); + } + return this; + } + + public Builder withType(String type) { + this.type = type; + return this; + } + + public Builder withCount(Map jsonTask, String key) { + if (jsonTask.containsKey(key)) { + count = Integer.parseInt(jsonTask.get(key).toString()); + count = Math.max(count, 1); + } + return this; + } + + public Builder withCount(int count) { + this.count = count; + return this; + } + + public Builder withExecutionType(Map jsonTask, String key) { + if (jsonTask.containsKey(key)) { + this.executionType = ExecutionType.valueOf( + jsonTask.get(key).toString()); + } + return this; + } + + public Builder withExecutionType(ExecutionType executionType) { + this.executionType = executionType; + return this; + } + + public Builder withAllocationId(Map jsonTask, String key) { + if (jsonTask.containsKey(key)) { + this.allocationId = Long.parseLong(jsonTask.get(key).toString()); + } + return this; + } + + public Builder withAllocationId(long allocationId) { + this.allocationId = allocationId; + return this; + } + + public Builder withRequestDelay(Map jsonTask, String key) { + if (jsonTask.containsKey(key)) { + requestDelay = Long.parseLong(jsonTask.get(key).toString()); + requestDelay = Math.max(requestDelay, 0); + } + return this; + } + + public Builder withRequestDelay(long requestDelay) { + this.requestDelay = requestDelay; + return this; + } + + public Builder withHostname(String hostname) { + this.hostname = hostname; + return this; + } + + public TaskContainerDefinition build() throws YarnException { + TaskContainerDefinition taskContainerDef = + new TaskContainerDefinition(); + taskContainerDef.duration = validateAndGetDuration(this); + taskContainerDef.resource = this.resource; + taskContainerDef.type = this.type; + taskContainerDef.requestDelay = this.requestDelay; + taskContainerDef.priority = this.priority; + taskContainerDef.count = this.count; + taskContainerDef.allocationId = this.allocationId; + taskContainerDef.executionType = this.executionType; + taskContainerDef.hostname = this.hostname; + return taskContainerDef; + } + + private long validateAndGetDuration(Builder builder) throws YarnException { + long duration = 0; + + if (builder.duration != -1) { + duration = builder.duration; + } else if (builder.durationLegacy != -1) { + duration = builder.durationLegacy; + } else if (builder.taskStart != -1 && builder.taskFinish != -1) { + duration = builder.taskFinish - builder.taskStart; + } + + if (duration <= 0) { + throw new YarnException("Duration of a task shouldn't be less or equal" + + " to 0!"); + } + return duration; + } + } +} diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java index 922f9a2b97a..0a87a6c2070 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java @@ -62,6 +62,7 @@ import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.apache.hadoop.yarn.sls.AMDefinition; import org.apache.hadoop.yarn.sls.scheduler.SchedulerMetrics; import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator; @@ -128,27 +129,25 @@ public abstract class AMSimulator extends TaskRunner.Task { this.responseQueue = new LinkedBlockingQueue<>(); } - @SuppressWarnings("checkstyle:parameternumber") - public void init(int heartbeatInterval, - List containerList, ResourceManager resourceManager, - SLSRunner slsRunnner, long startTime, long finishTime, String simUser, - String simQueue, boolean tracked, String oldApp, long baseTimeMS, - Resource amResource, String nodeLabelExpr, Map params, - Map appIdAMSim) { - super.init(startTime, startTime + 1000000L * heartbeatInterval, - heartbeatInterval); - this.user = simUser; - this.rm = resourceManager; - this.se = slsRunnner; - this.queue = simQueue; - this.oldAppId = oldApp; + public void init(AMDefinition amDef, ResourceManager rm, SLSRunner slsRunner, + boolean tracked, long baselineTimeMS, long heartbeatInterval, + Map appIdToAMSim) { + long startTime = amDef.getJobStartTime(); + long endTime = startTime + 1000000L * heartbeatInterval; + super.init(startTime, endTime, heartbeatInterval); + + this.user = amDef.getUser(); + this.queue = amDef.getQueue(); + this.oldAppId = amDef.getOldAppId(); + this.amContainerResource = amDef.getAmResource(); + this.nodeLabelExpression = amDef.getLabelExpression(); + this.traceStartTimeMS = amDef.getJobStartTime(); + this.traceFinishTimeMS = amDef.getJobFinishTime(); + this.rm = rm; + this.se = slsRunner; this.isTracked = tracked; - this.baselineTimeMS = baseTimeMS; - this.traceStartTimeMS = startTime; - this.traceFinishTimeMS = finishTime; - this.amContainerResource = amResource; - this.nodeLabelExpression = nodeLabelExpr; - this.appIdToAMSim = appIdAMSim; + this.baselineTimeMS = baselineTimeMS; + this.appIdToAMSim = appIdToAMSim; } /** diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/DAGAMSimulator.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/DAGAMSimulator.java index 83467e0e5cf..418408db9fb 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/DAGAMSimulator.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/DAGAMSimulator.java @@ -32,10 +32,10 @@ import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ReservationId; -import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.apache.hadoop.yarn.sls.AMDefinition; import org.apache.hadoop.yarn.sls.SLSRunner; import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator; import org.slf4j.Logger; @@ -93,19 +93,15 @@ public class DAGAMSimulator extends AMSimulator { LoggerFactory.getLogger(DAGAMSimulator.class); @SuppressWarnings("checkstyle:parameternumber") - public void init(int heartbeatInterval, - List containerList, ResourceManager resourceManager, - SLSRunner slsRunnner, long startTime, long finishTime, String simUser, - String simQueue, boolean tracked, String oldApp, long baseTimeMS, - Resource amResource, String nodeLabelExpr, Map params, - Map appIdAMSim) { - super.init(heartbeatInterval, containerList, resourceManager, slsRunnner, - startTime, finishTime, simUser, simQueue, tracked, oldApp, baseTimeMS, - amResource, nodeLabelExpr, params, appIdAMSim); + public void init(AMDefinition amDef, ResourceManager rm, SLSRunner slsRunner, + boolean tracked, long baselineTimeMS, long heartbeatInterval, + Map appIdToAMSim) { + super.init(amDef, rm, slsRunner, tracked, baselineTimeMS, heartbeatInterval, + appIdToAMSim); super.amtype = "dag"; - allContainers.addAll(containerList); - pendingContainers.addAll(containerList); + allContainers.addAll(amDef.getTaskContainers()); + pendingContainers.addAll(amDef.getTaskContainers()); totalContainers = allContainers.size(); LOG.info("Added new job with {} containers", allContainers.size()); diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java index 184fdca2e57..976c0229b86 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java @@ -45,6 +45,7 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.apache.hadoop.yarn.sls.AMDefinition; import org.apache.hadoop.yarn.sls.ReservationClientUtil; import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator; import org.apache.hadoop.yarn.sls.SLSRunner; @@ -123,19 +124,15 @@ public class MRAMSimulator extends AMSimulator { LoggerFactory.getLogger(MRAMSimulator.class); @SuppressWarnings("checkstyle:parameternumber") - public void init(int heartbeatInterval, - List containerList, ResourceManager rm, SLSRunner se, - long traceStartTime, long traceFinishTime, String user, String queue, - boolean isTracked, String oldAppId, long baselineStartTimeMS, - Resource amContainerResource, String nodeLabelExpr, - Map params, Map appIdAMSim) { - super.init(heartbeatInterval, containerList, rm, se, traceStartTime, - traceFinishTime, user, queue, isTracked, oldAppId, baselineStartTimeMS, - amContainerResource, nodeLabelExpr, params, appIdAMSim); + public void init(AMDefinition amDef, ResourceManager rm, SLSRunner slsRunner, + boolean tracked, long baselineTimeMS, long heartbeatInterval, + Map appIdToAMSim) { + super.init(amDef, rm, slsRunner, tracked, baselineTimeMS, + heartbeatInterval, appIdToAMSim); amtype = "mapreduce"; // get map/reduce tasks - for (ContainerSimulator cs : containerList) { + for (ContainerSimulator cs : amDef.getTaskContainers()) { if (cs.getType().equals("map")) { cs.setPriority(PRIORITY_MAP); allMaps.add(cs); diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/StreamAMSimulator.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/StreamAMSimulator.java index 7e3545191f2..09297afd4d0 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/StreamAMSimulator.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/StreamAMSimulator.java @@ -30,11 +30,11 @@ import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ReservationId; -import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.apache.hadoop.yarn.sls.AMDefinition; import org.apache.hadoop.yarn.sls.SLSRunner; import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator; import org.slf4j.Logger; @@ -93,21 +93,14 @@ public class StreamAMSimulator extends AMSimulator { LoggerFactory.getLogger(StreamAMSimulator.class); @SuppressWarnings("checkstyle:parameternumber") - public void init(int heartbeatInterval, - List containerList, ResourceManager rm, SLSRunner se, - long traceStartTime, long traceFinishTime, String user, String queue, - boolean isTracked, String oldAppId, long baselineStartTimeMS, - Resource amContainerResource, String nodeLabelExpr, - Map params, Map appIdAMSim) { - super.init(heartbeatInterval, containerList, rm, se, traceStartTime, - traceFinishTime, user, queue, isTracked, oldAppId, baselineStartTimeMS, - amContainerResource, nodeLabelExpr, params, appIdAMSim); + public void init(AMDefinition amDef, ResourceManager rm, SLSRunner slsRunner, + boolean tracked, long baselineTimeMS, long heartbeatInterval, + Map appIdToAMSim) { + super.init(amDef, rm, slsRunner, tracked, baselineTimeMS, + heartbeatInterval, appIdToAMSim); amtype = "stream"; - - allStreams.addAll(containerList); - - duration = traceFinishTime - traceStartTime; - + allStreams.addAll(amDef.getTaskContainers()); + duration = amDef.getJobFinishTime() - amDef.getJobStartTime(); LOG.info("Added new job with {} streams, running for {}", allStreams.size(), duration); } diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ContainerSimulator.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ContainerSimulator.java index e83ee91d8e1..8f119943570 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ContainerSimulator.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ContainerSimulator.java @@ -26,54 +26,41 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.sls.TaskContainerDefinition; @Private @Unstable public class ContainerSimulator implements Delayed { - // id private ContainerId id; - // resource allocated private Resource resource; - // end time private long endTime; // life time (ms) private long lifeTime; // time(ms) after which container would be requested by AM private long requestDelay; - // host name private String hostname; - // priority private int priority; - // type private String type; - // execution type private ExecutionType executionType = ExecutionType.GUARANTEED; - // allocation id private long allocationId; /** - * invoked when AM schedules containers to allocate. + * Invoked when AM schedules containers to allocate. + * @param def The task's definition object. + * @return ContainerSimulator object */ - public ContainerSimulator(Resource resource, long lifeTime, - String hostname, int priority, String type) { - this(resource, lifeTime, hostname, priority, type, - ExecutionType.GUARANTEED); + public static ContainerSimulator createFromTaskContainerDefinition( + TaskContainerDefinition def) { + return new ContainerSimulator(def.getResource(), def.getDuration(), + def.getHostname(), def.getPriority(), def.getType(), + def.getExecutionType(), def.getAllocationId(), def.getRequestDelay()); } /** - * invoked when AM schedules containers to allocate. - */ - public ContainerSimulator(Resource resource, long lifeTime, - String hostname, int priority, String type, ExecutionType executionType) { - this(resource, lifeTime, hostname, priority, type, - executionType, -1, 0); - } - - /** - * invoked when AM schedules containers to allocate. + * Invoked when AM schedules containers to allocate. */ @SuppressWarnings("checkstyle:parameternumber") - public ContainerSimulator(Resource resource, long lifeTime, + private ContainerSimulator(Resource resource, long lifeTime, String hostname, int priority, String type, ExecutionType executionType, long allocationId, long requestDelay) { this.resource = resource; @@ -87,7 +74,7 @@ public class ContainerSimulator implements Delayed { } /** - * invoke when NM schedules containers to run. + * Invoked when NM schedules containers to run. */ public ContainerSimulator(ContainerId id, Resource resource, long endTime, long lifeTime, long allocationId) { diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/utils/SLSUtils.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/utils/SLSUtils.java index 256dcf46291..e529d1841a6 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/utils/SLSUtils.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/utils/SLSUtils.java @@ -57,8 +57,6 @@ import org.apache.hadoop.yarn.util.resource.Resources; @Private @Unstable public class SLSUtils { - public final static String DEFAULT_JOB_TYPE = "mapreduce"; - private static final String LABEL_FORMAT_ERR_MSG = "Input format for adding node-labels is not correct, it should be " + "labelName1[(exclusive=true/false)],labelName2[] .."; diff --git a/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestDagAMSimulator.java b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestDagAMSimulator.java index 8ac7fff75cb..e458b860e4d 100644 --- a/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestDagAMSimulator.java +++ b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestDagAMSimulator.java @@ -26,6 +26,8 @@ import java.util.ArrayList; import java.util.List; import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; /** * Tests for DagAMSimulator. @@ -74,7 +76,17 @@ public class TestDagAMSimulator { private ContainerSimulator createContainerSim(long allocationId, long requestDelay) { - return new ContainerSimulator(null, 1000, "*", 1, "Map", - null, allocationId, requestDelay); + TaskContainerDefinition taskContainerDef = + mock(TaskContainerDefinition.class); + when(taskContainerDef.getResource()).thenReturn(null); + when(taskContainerDef.getDuration()).thenReturn(1000L); + when(taskContainerDef.getHostname()).thenReturn("*"); + when(taskContainerDef.getPriority()).thenReturn(1); + when(taskContainerDef.getType()).thenReturn("Map"); + when(taskContainerDef.getExecutionType()).thenReturn(null); + when(taskContainerDef.getAllocationId()).thenReturn(allocationId); + when(taskContainerDef.getRequestDelay()).thenReturn(requestDelay); + return ContainerSimulator.createFromTaskContainerDefinition( + taskContainerDef); } } diff --git a/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/appmaster/TestAMSimulator.java b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/appmaster/TestAMSimulator.java index 50ac700d9c6..f5db1684c71 100644 --- a/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/appmaster/TestAMSimulator.java +++ b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/appmaster/TestAMSimulator.java @@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.sls.appmaster; import com.codahale.metrics.MetricRegistry; import java.util.HashMap; import org.apache.commons.io.FileUtils; +import org.apache.hadoop.tools.rumen.datatypes.UserName; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.NodeId; @@ -33,6 +34,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; +import org.apache.hadoop.yarn.sls.AMDefinitionRumen; +import org.apache.hadoop.yarn.sls.TaskContainerDefinition; import org.apache.hadoop.yarn.sls.SLSRunner; import org.apache.hadoop.yarn.sls.conf.SLSConfiguration; import org.apache.hadoop.yarn.sls.nodemanager.NMSimulator; @@ -57,6 +60,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentMap; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @RunWith(Parameterized.class) @@ -157,9 +161,20 @@ public class TestAMSimulator { String queue = "default"; List containers = new ArrayList<>(); HashMap map = new HashMap<>(); - app.init(1000, containers, rm, null, 0, 1000000L, "user1", queue, true, - appId, 0, SLSConfiguration.getAMContainerResource(conf), null, null, - map); + + UserName mockUser = mock(UserName.class); + when(mockUser.getValue()).thenReturn("user1"); + AMDefinitionRumen amDef = + AMDefinitionRumen.Builder.create() + .withUser(mockUser) + .withQueue(queue) + .withJobId(appId) + .withJobStartTime(0) + .withJobFinishTime(1000000L) + .withAmResource(SLSConfiguration.getAMContainerResource(conf)) + .withTaskContainers(containers) + .build(); + app.init(amDef, rm, null, true, 0, 1000, map); app.firstStep(); verifySchedulerMetrics(appId); @@ -184,9 +199,21 @@ public class TestAMSimulator { String queue = "default"; List containers = new ArrayList<>(); HashMap map = new HashMap<>(); - app.init(1000, containers, rm, null, 0, 1000000L, "user1", queue, true, - appId, 0, SLSConfiguration.getAMContainerResource(conf), "label1", - null, map); + + UserName mockUser = mock(UserName.class); + when(mockUser.getValue()).thenReturn("user1"); + AMDefinitionRumen amDef = + AMDefinitionRumen.Builder.create() + .withUser(mockUser) + .withQueue(queue) + .withJobId(appId) + .withJobStartTime(0) + .withJobFinishTime(1000000L) + .withAmResource(SLSConfiguration.getAMContainerResource(conf)) + .withTaskContainers(containers) + .withLabelExpression("label1") + .build(); + app.init(amDef, rm, null, true, 0, 1000, map); app.firstStep(); verifySchedulerMetrics(appId); @@ -201,7 +228,7 @@ public class TestAMSimulator { } @Test - public void testPackageRequests() { + public void testPackageRequests() throws YarnException { MockAMSimulator app = new MockAMSimulator(); List containerSimulators = new ArrayList<>(); Resource resource = Resources.createResource(1024); @@ -209,12 +236,25 @@ public class TestAMSimulator { ExecutionType execType = ExecutionType.GUARANTEED; String type = "map"; - ContainerSimulator s1 = new ContainerSimulator(resource, 100, - "/default-rack/h1", priority, type, execType); - ContainerSimulator s2 = new ContainerSimulator(resource, 100, - "/default-rack/h1", priority, type, execType); - ContainerSimulator s3 = new ContainerSimulator(resource, 100, - "/default-rack/h2", priority, type, execType); + TaskContainerDefinition.Builder builder = + TaskContainerDefinition.Builder.create() + .withResource(resource) + .withDuration(100) + .withPriority(1) + .withType(type) + .withExecutionType(execType) + .withAllocationId(-1) + .withRequestDelay(0); + + ContainerSimulator s1 = ContainerSimulator + .createFromTaskContainerDefinition( + builder.withHostname("/default-rack/h1").build()); + ContainerSimulator s2 = ContainerSimulator + .createFromTaskContainerDefinition( + builder.withHostname("/default-rack/h1").build()); + ContainerSimulator s3 = ContainerSimulator + .createFromTaskContainerDefinition( + builder.withHostname("/default-rack/h2").build()); containerSimulators.add(s1); containerSimulators.add(s2); @@ -250,12 +290,15 @@ public class TestAMSimulator { Assert.assertEquals(2, nodeRequestCount); containerSimulators.clear(); - s1 = new ContainerSimulator(resource, 100, - "/default-rack/h1", priority, type, execType, 1, 0); - s2 = new ContainerSimulator(resource, 100, - "/default-rack/h1", priority, type, execType, 2, 0); - s3 = new ContainerSimulator(resource, 100, - "/default-rack/h2", priority, type, execType, 1, 0); + s1 = ContainerSimulator.createFromTaskContainerDefinition( + createDefaultTaskContainerDefMock(resource, priority, execType, type, + "/default-rack/h1", 1)); + s2 = ContainerSimulator.createFromTaskContainerDefinition( + createDefaultTaskContainerDefMock(resource, priority, execType, type, + "/default-rack/h1", 2)); + s3 = ContainerSimulator.createFromTaskContainerDefinition( + createDefaultTaskContainerDefMock(resource, priority, execType, type, + "/default-rack/h2", 1)); containerSimulators.add(s1); containerSimulators.add(s2); @@ -317,6 +360,20 @@ public class TestAMSimulator { Assert.assertFalse(nm.getNode().getRunningApps().contains(app.appId)); Assert.assertTrue(nm.getNode().getRunningApps().isEmpty()); } + private TaskContainerDefinition createDefaultTaskContainerDefMock( + Resource resource, int priority, ExecutionType execType, String type, + String hostname, long allocationId) { + TaskContainerDefinition taskContainerDef = + mock(TaskContainerDefinition.class); + when(taskContainerDef.getResource()).thenReturn(resource); + when(taskContainerDef.getDuration()).thenReturn(100L); + when(taskContainerDef.getPriority()).thenReturn(priority); + when(taskContainerDef.getType()).thenReturn(type); + when(taskContainerDef.getExecutionType()).thenReturn(execType); + when(taskContainerDef.getHostname()).thenReturn(hostname); + when(taskContainerDef.getAllocationId()).thenReturn(allocationId); + return taskContainerDef; + } @After public void tearDown() {