From ab8c36062047e5c4c76ab0399162af9765988690 Mon Sep 17 00:00:00 2001 From: Szilard Nemeth Date: Sat, 26 Dec 2020 23:59:30 +0100 Subject: [PATCH] YARN-10550. Decouple NM runner logic from SLSRunner. Contributed by Szilard Nemeth --- .../org/apache/hadoop/yarn/sls/AMRunner.java | 10 +- .../org/apache/hadoop/yarn/sls/NMRunner.java | 238 ++++++++++++++++++ .../org/apache/hadoop/yarn/sls/SLSRunner.java | 213 ++++------------ .../hadoop/yarn/sls/nodemanager/NodeInfo.java | 1 - .../yarn/sls/scheduler/RMNodeWrapper.java | 1 - .../yarn/sls/scheduler/SchedulerMetrics.java | 12 +- 6 files changed, 296 insertions(+), 179 deletions(-) create mode 100644 hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/NMRunner.java diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/AMRunner.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/AMRunner.java index 301b4260f35..d80337688d5 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/AMRunner.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/AMRunner.java @@ -63,7 +63,6 @@ public class AMRunner { private Map amClassMap; private TraceType inputType; private String[] inputTraces; - private SynthTraceJobProducer stjp; private TaskRunner runner; private SLSRunner slsRunner; private int numAMs, numTasks; @@ -148,16 +147,15 @@ public class AMRunner { private void startAMFromSynthGenerator() throws YarnException, IOException { Configuration localConf = new Configuration(); localConf.set("fs.defaultFS", "file:///"); - // if we use the nodeFile this could have been not initialized yet. - if (stjp == null) { - stjp = new SynthTraceJobProducer(conf, new Path(inputTraces[0])); - slsRunner.setStjp(stjp); + //if we use the nodeFile this could have been not initialized yet. + if (slsRunner.getStjp() == null) { + slsRunner.setStjp(new SynthTraceJobProducer(conf, new Path(inputTraces[0]))); } SynthJob job; // we use stjp, a reference to the job producer instantiated during node // creation - while ((job = (SynthJob) stjp.getNextJob()) != null) { + while ((job = (SynthJob) slsRunner.getStjp().getNextJob()) != null) { ReservationId reservationId = null; if (job.hasDeadline()) { reservationId = ReservationId diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/NMRunner.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/NMRunner.java new file mode 100644 index 00000000000..224e1e373ff --- /dev/null +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/NMRunner.java @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.sls; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.NodeLabel; +import org.apache.hadoop.yarn.api.records.NodeState; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceInformation; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.sls.SLSRunner.TraceType; +import org.apache.hadoop.yarn.sls.conf.SLSConfiguration; +import org.apache.hadoop.yarn.sls.nodemanager.NMSimulator; +import org.apache.hadoop.yarn.sls.scheduler.TaskRunner; +import org.apache.hadoop.yarn.sls.synthetic.SynthTraceJobProducer; +import org.apache.hadoop.yarn.sls.utils.SLSUtils; +import org.apache.hadoop.yarn.util.resource.ResourceUtils; +import org.apache.hadoop.yarn.util.resource.Resources; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +public class NMRunner { + private static final Logger LOG = LoggerFactory.getLogger(NMRunner.class); + + // other simulation information + private int numNMs, numRacks; + + // NM simulator + private Map nmMap; + private Resource nodeManagerResource; + private String nodeFile; + private TaskRunner taskRunner; + private Configuration conf; + private ResourceManager rm; + private String tableMapping; + private int threadPoolSize; + private TraceType inputType; + private String[] inputTraces; + private SynthTraceJobProducer stjp; + + public NMRunner(TaskRunner taskRunner, Configuration conf, ResourceManager rm, String tableMapping, int threadPoolSize) { + this.taskRunner = taskRunner; + this.conf = conf; + this.rm = rm; + this.tableMapping = tableMapping; + this.threadPoolSize = threadPoolSize; + this.nmMap = new ConcurrentHashMap<>(); + this.nodeManagerResource = getNodeManagerResourceFromConf(); + } + + public void startNM() throws YarnException, IOException, + InterruptedException { + // nm configuration + int heartbeatInterval = conf.getInt( + SLSConfiguration.NM_HEARTBEAT_INTERVAL_MS, + SLSConfiguration.NM_HEARTBEAT_INTERVAL_MS_DEFAULT); + float resourceUtilizationRatio = conf.getFloat( + SLSConfiguration.NM_RESOURCE_UTILIZATION_RATIO, + SLSConfiguration.NM_RESOURCE_UTILIZATION_RATIO_DEFAULT); + // nm information (fetch from topology file, or from sls/rumen json file) + Set nodeSet = null; + if (nodeFile.isEmpty()) { + for (String inputTrace : inputTraces) { + switch (inputType) { + case SLS: + nodeSet = SLSUtils.parseNodesFromSLSTrace(inputTrace); + break; + case RUMEN: + nodeSet = SLSUtils.parseNodesFromRumenTrace(inputTrace); + break; + case SYNTH: + stjp = new SynthTraceJobProducer(conf, new Path(inputTraces[0])); + nodeSet = SLSUtils.generateNodes(stjp.getNumNodes(), + stjp.getNumNodes()/stjp.getNodesPerRack()); + break; + default: + throw new YarnException("Input configuration not recognized, " + + "trace type should be SLS, RUMEN, or SYNTH"); + } + } + } else { + nodeSet = SLSUtils.parseNodesFromNodeFile(nodeFile, + nodeManagerResource); + } + + if (nodeSet == null || nodeSet.isEmpty()) { + throw new YarnException("No node! Please configure nodes."); + } + + SLSUtils.generateNodeTableMapping(nodeSet, tableMapping); + + // create NM simulators + Random random = new Random(); + Set rackSet = ConcurrentHashMap.newKeySet(); + int threadPoolSize = Math.max(this.threadPoolSize, + SLSConfiguration.RUNNER_POOL_SIZE_DEFAULT); + ExecutorService executorService = Executors. + newFixedThreadPool(threadPoolSize); + for (SLSRunner.NodeDetails nodeDetails : nodeSet) { + executorService.submit(new Runnable() { + @Override public void run() { + try { + // we randomize the heartbeat start time from zero to 1 interval + NMSimulator nm = new NMSimulator(); + Resource nmResource = nodeManagerResource; + String hostName = nodeDetails.getHostname(); + if (nodeDetails.getNodeResource() != null) { + nmResource = nodeDetails.getNodeResource(); + } + Set nodeLabels = nodeDetails.getLabels(); + nm.init(hostName, nmResource, + random.nextInt(heartbeatInterval), + heartbeatInterval, rm, resourceUtilizationRatio, nodeLabels); + nmMap.put(nm.getNode().getNodeID(), nm); + taskRunner.schedule(nm); + rackSet.add(nm.getNode().getRackName()); + } catch (IOException | YarnException e) { + LOG.error("Got an error while adding node", e); + } + } + }); + } + executorService.shutdown(); + executorService.awaitTermination(10, TimeUnit.MINUTES); + numRacks = rackSet.size(); + numNMs = nmMap.size(); + } + + void waitForNodesRunning() throws InterruptedException { + long startTimeMS = System.currentTimeMillis(); + while (true) { + int numRunningNodes = 0; + for (RMNode node : rm.getRMContext().getRMNodes().values()) { + if (node.getState() == NodeState.RUNNING) { + numRunningNodes++; + } + } + if (numRunningNodes == numNMs) { + break; + } + LOG.info("SLSRunner is waiting for all nodes RUNNING." + + " {} of {} NMs initialized.", numRunningNodes, numNMs); + Thread.sleep(1000); + } + LOG.info("SLSRunner takes {} ms to launch all nodes.", + System.currentTimeMillis() - startTimeMS); + } + + private Resource getNodeManagerResourceFromConf() { + Resource resource = Resources.createResource(0); + ResourceInformation[] infors = ResourceUtils.getResourceTypesArray(); + for (ResourceInformation info : infors) { + long value; + if (info.getName().equals(ResourceInformation.MEMORY_URI)) { + value = conf.getInt(SLSConfiguration.NM_MEMORY_MB, + SLSConfiguration.NM_MEMORY_MB_DEFAULT); + } else if (info.getName().equals(ResourceInformation.VCORES_URI)) { + value = conf.getInt(SLSConfiguration.NM_VCORES, + SLSConfiguration.NM_VCORES_DEFAULT); + } else { + value = conf.getLong(SLSConfiguration.NM_PREFIX + + info.getName(), SLSConfiguration.NM_RESOURCE_DEFAULT); + } + + resource.setResourceValue(info.getName(), value); + } + + return resource; + } + + public void setNodeFile(String nodeFile) { + this.nodeFile = nodeFile; + } + + + public void setInputType(TraceType inputType) { + this.inputType = inputType; + } + + public void setInputTraces(String[] inputTraces) { + this.inputTraces = inputTraces; + } + + public int getNumNMs() { + return numNMs; + } + + public int getNumRacks() { + return numRacks; + } + + public Resource getNodeManagerResource() { + return nodeManagerResource; + } + + public Map getNmMap() { + return nmMap; + } + + public SynthTraceJobProducer getStjp() { + return stjp; + } + + public void setTableMapping(String tableMapping) { + this.tableMapping = tableMapping; + } + + public void setRm(ResourceManager rm) { + this.rm = rm; + } +} diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java index e9ae7f51dba..318476427a6 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java @@ -59,11 +59,9 @@ import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeLabel; -import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceInformation; import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.sls.appmaster.AMSimulator; import org.apache.hadoop.yarn.sls.conf.SLSConfiguration; import org.apache.hadoop.yarn.sls.nodemanager.NMSimulator; @@ -71,8 +69,6 @@ import org.apache.hadoop.yarn.sls.scheduler.SchedulerWrapper; import org.apache.hadoop.yarn.sls.scheduler.TaskRunner; import org.apache.hadoop.yarn.sls.scheduler.Tracker; import org.apache.hadoop.yarn.sls.synthetic.SynthTraceJobProducer; -import org.apache.hadoop.yarn.sls.utils.SLSUtils; -import org.apache.hadoop.yarn.util.resource.ResourceUtils; import org.apache.hadoop.yarn.util.resource.Resources; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -85,32 +81,19 @@ import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Map; -import java.util.Random; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; @Private @Unstable public class SLSRunner extends Configured implements Tool { private static TaskRunner runner = new TaskRunner(); private String[] inputTraces; - private int poolSize; - - // NM simulator - private Map nmMap; - private Resource nodeManagerResource; - private String nodeFile; // metrics private boolean printSimulation; - // other simulation information - private int numNMs, numRacks; - - private final static Map simulateInfoMap = new HashMap<>(); + private final static Map simulateInfoMap = + new HashMap<>(); // logger public final static Logger LOG = LoggerFactory.getLogger(SLSRunner.class); @@ -118,6 +101,9 @@ public class SLSRunner extends Configured implements Tool { private static boolean exitAtTheFinish = false; private AMRunner amRunner; private RMRunner rmRunner; + private NMRunner nmRunner; + + private SynthTraceJobProducer stjp; /** * The type of trace in input. @@ -130,19 +116,16 @@ public class SLSRunner extends Configured implements Tool { public static final String NETWORK_NEGATIVE_CACHE_TTL = "networkaddress.cache.negative.ttl"; - private TraceType inputType; - private SynthTraceJobProducer stjp; - public static int getRemainingApps() { return AMRunner.REMAINING_APPS; } - public SLSRunner() throws ClassNotFoundException { + public SLSRunner() throws ClassNotFoundException, YarnException { Configuration tempConf = new Configuration(false); init(tempConf); } - public SLSRunner(Configuration tempConf) throws ClassNotFoundException { + public SLSRunner(Configuration tempConf) throws ClassNotFoundException, YarnException { init(tempConf); } @@ -156,43 +139,31 @@ public class SLSRunner extends Configured implements Tool { super.setConf(conf); } - private void init(Configuration tempConf) throws ClassNotFoundException { + private void init(Configuration tempConf) throws ClassNotFoundException, YarnException { // runner configuration setConf(tempConf); - - nmMap = new ConcurrentHashMap<>(); - amRunner = new AMRunner(runner, this); - rmRunner = new RMRunner(tempConf, this); - - // runner - poolSize = tempConf.getInt(SLSConfiguration.RUNNER_POOL_SIZE, + + int poolSize = tempConf.getInt(SLSConfiguration.RUNNER_POOL_SIZE, SLSConfiguration.RUNNER_POOL_SIZE_DEFAULT); SLSRunner.runner.setQueueSize(poolSize); + rmRunner = new RMRunner(getConf(), this); + nmRunner = new NMRunner(runner, getConf(), rmRunner.getRm(), rmRunner.getTableMapping(), poolSize); + amRunner = new AMRunner(runner, this); amRunner.init(tempConf); - nodeManagerResource = getNodeManagerResource(); } - private Resource getNodeManagerResource() { - Resource resource = Resources.createResource(0); - ResourceInformation[] infors = ResourceUtils.getResourceTypesArray(); - for (ResourceInformation info : infors) { - long value; - if (info.getName().equals(ResourceInformation.MEMORY_URI)) { - value = getConf().getInt(SLSConfiguration.NM_MEMORY_MB, - SLSConfiguration.NM_MEMORY_MB_DEFAULT); - } else if (info.getName().equals(ResourceInformation.VCORES_URI)) { - value = getConf().getInt(SLSConfiguration.NM_VCORES, - SLSConfiguration.NM_VCORES_DEFAULT); - } else { - value = getConf().getLong(SLSConfiguration.NM_PREFIX + - info.getName(), SLSConfiguration.NM_RESOURCE_DEFAULT); + private SynthTraceJobProducer getSynthJobTraceProducer() throws YarnException { + // if we use the nodeFile this could have been not initialized yet. + if (nmRunner.getStjp() != null) { + return nmRunner.getStjp(); + } else { + try { + return new SynthTraceJobProducer(getConf(), new Path(inputTraces[0])); + } catch (IOException e) { + throw new YarnException("Failed to initialize SynthTraceJobProducer", e); } - - resource.setResourceValue(info.getName(), value); } - - return resource; } /** @@ -213,29 +184,37 @@ public class SLSRunner extends Configured implements Tool { */ public void setSimulationParams(TraceType inType, String[] inTraces, String nodes, String metricsOutputDir, Set trackApps, - boolean printsimulation) { - - this.inputType = inType; + boolean printsimulation) throws YarnException { this.inputTraces = inTraces.clone(); - this.amRunner.setInputType(this.inputType); + this.amRunner.setInputType(inType); this.amRunner.setInputTraces(this.inputTraces); this.amRunner.setTrackedApps(trackApps); - this.nodeFile = nodes; + this.nmRunner.setNodeFile(nodes); + this.nmRunner.setInputType(inType); + this.nmRunner.setInputTraces(this.inputTraces); this.printSimulation = printsimulation; this.rmRunner.setMetricsOutputDir(metricsOutputDir); - this.rmRunner.setTableMapping(metricsOutputDir + "/tableMapping.csv"); + String tableMapping = metricsOutputDir + "/tableMapping.csv"; + this.rmRunner.setTableMapping(tableMapping); + this.nmRunner.setTableMapping(tableMapping); + + //We need this.inputTraces to set before creating SynthTraceJobProducer + if (inType == TraceType.SYNTH) { + this.stjp = getSynthJobTraceProducer(); + } } public void start() throws IOException, ClassNotFoundException, YarnException, InterruptedException { - enableDNSCaching(getConf()); // start resource manager rmRunner.startRM(); + nmRunner.setRm(rmRunner.getRm()); amRunner.setResourceManager(rmRunner.getRm()); + // start node managers - startNM(); + nmRunner.startNM(); // start application masters amRunner.startAM(); @@ -248,7 +227,7 @@ public class SLSRunner extends Configured implements Tool { // print out simulation info printSimulationInfo(); // blocked until all nodes RUNNING - waitForNodesRunning(); + nmRunner.waitForNodesRunning(); // starting the runner once everything is ready to go, runner.start(); } @@ -270,104 +249,6 @@ public class SLSRunner extends Configured implements Tool { } } - private void startNM() throws YarnException, IOException, - InterruptedException { - // nm configuration - int heartbeatInterval = getConf().getInt( - SLSConfiguration.NM_HEARTBEAT_INTERVAL_MS, - SLSConfiguration.NM_HEARTBEAT_INTERVAL_MS_DEFAULT); - float resourceUtilizationRatio = getConf().getFloat( - SLSConfiguration.NM_RESOURCE_UTILIZATION_RATIO, - SLSConfiguration.NM_RESOURCE_UTILIZATION_RATIO_DEFAULT); - // nm information (fetch from topology file, or from sls/rumen json file) - Set nodeSet = null; - if (nodeFile.isEmpty()) { - for (String inputTrace : inputTraces) { - switch (inputType) { - case SLS: - nodeSet = SLSUtils.parseNodesFromSLSTrace(inputTrace); - break; - case RUMEN: - nodeSet = SLSUtils.parseNodesFromRumenTrace(inputTrace); - break; - case SYNTH: - stjp = new SynthTraceJobProducer(getConf(), new Path(inputTraces[0])); - nodeSet = SLSUtils.generateNodes(stjp.getNumNodes(), - stjp.getNumNodes()/stjp.getNodesPerRack()); - break; - default: - throw new YarnException("Input configuration not recognized, " - + "trace type should be SLS, RUMEN, or SYNTH"); - } - } - } else { - nodeSet = SLSUtils.parseNodesFromNodeFile(nodeFile, - nodeManagerResource); - } - - if (nodeSet == null || nodeSet.isEmpty()) { - throw new YarnException("No node! Please configure nodes."); - } - - SLSUtils.generateNodeTableMapping(nodeSet, rmRunner.getTableMapping()); - - // create NM simulators - Random random = new Random(); - Set rackSet = ConcurrentHashMap.newKeySet(); - int threadPoolSize = Math.max(poolSize, - SLSConfiguration.RUNNER_POOL_SIZE_DEFAULT); - ExecutorService executorService = Executors. - newFixedThreadPool(threadPoolSize); - for (NodeDetails nodeDetails : nodeSet) { - executorService.submit(new Runnable() { - @Override public void run() { - try { - // we randomize the heartbeat start time from zero to 1 interval - NMSimulator nm = new NMSimulator(); - Resource nmResource = nodeManagerResource; - String hostName = nodeDetails.getHostname(); - if (nodeDetails.getNodeResource() != null) { - nmResource = nodeDetails.getNodeResource(); - } - Set nodeLabels = nodeDetails.getLabels(); - nm.init(hostName, nmResource, - random.nextInt(heartbeatInterval), - heartbeatInterval, rmRunner.getRm(), resourceUtilizationRatio, nodeLabels); - nmMap.put(nm.getNode().getNodeID(), nm); - runner.schedule(nm); - rackSet.add(nm.getNode().getRackName()); - } catch (IOException | YarnException e) { - LOG.error("Got an error while adding node", e); - } - } - }); - } - executorService.shutdown(); - executorService.awaitTermination(10, TimeUnit.MINUTES); - numRacks = rackSet.size(); - numNMs = nmMap.size(); - } - - private void waitForNodesRunning() throws InterruptedException { - long startTimeMS = System.currentTimeMillis(); - while (true) { - int numRunningNodes = 0; - for (RMNode node : rmRunner.getRm().getRMContext().getRMNodes().values()) { - if (node.getState() == NodeState.RUNNING) { - numRunningNodes++; - } - } - if (numRunningNodes == numNMs) { - break; - } - LOG.info("SLSRunner is waiting for all nodes RUNNING." - + " {} of {} NMs initialized.", numRunningNodes, numNMs); - Thread.sleep(1000); - } - LOG.info("SLSRunner takes {} ms to launch all nodes.", - System.currentTimeMillis() - startTimeMS); - } - Resource getDefaultContainerResource() { int containerMemory = getConf().getInt(SLSConfiguration.CONTAINER_MEMORY_MB, SLSConfiguration.CONTAINER_MEMORY_MB_DEFAULT); @@ -392,7 +273,7 @@ public class SLSRunner extends Configured implements Tool { LOG.info("------------------------------------"); LOG.info("# nodes = {}, # racks = {}, capacity " + "of each node {}.", - numNMs, numRacks, nodeManagerResource); + nmRunner.getNumNMs(), nmRunner.getNumRacks(), nmRunner.getNodeManagerResource()); LOG.info("------------------------------------"); // job LOG.info("# applications = {}, # total " + @@ -416,12 +297,12 @@ public class SLSRunner extends Configured implements Tool { LOG.info("------------------------------------"); } // package these information in the simulateInfoMap used by other places - simulateInfoMap.put("Number of racks", numRacks); - simulateInfoMap.put("Number of nodes", numNMs); + simulateInfoMap.put("Number of racks", nmRunner.getNumRacks()); + simulateInfoMap.put("Number of nodes", nmRunner.getNumNMs()); simulateInfoMap.put("Node memory (MB)", - nodeManagerResource.getResourceValue(ResourceInformation.MEMORY_URI)); + nmRunner.getNodeManagerResource().getResourceValue(ResourceInformation.MEMORY_URI)); simulateInfoMap.put("Node VCores", - nodeManagerResource.getResourceValue(ResourceInformation.VCORES_URI)); + nmRunner.getNodeManagerResource().getResourceValue(ResourceInformation.VCORES_URI)); simulateInfoMap.put("Number of applications", numAMs); simulateInfoMap.put("Number of tasks", numTasks); simulateInfoMap.put("Average tasks per applicaion", @@ -434,7 +315,7 @@ public class SLSRunner extends Configured implements Tool { } public Map getNmMap() { - return nmMap; + return nmRunner.getNmMap(); } public static void decreaseRemainingApps() { @@ -458,7 +339,6 @@ public class SLSRunner extends Configured implements Tool { public int run(final String[] argv) throws IOException, InterruptedException, ParseException, ClassNotFoundException, YarnException { - Options options = new Options(); // Left for compatibility @@ -524,7 +404,6 @@ public class SLSRunner extends Configured implements Tool { case "RUMEN": tempTraceType = TraceType.RUMEN; break; - case "SYNTH": tempTraceType = TraceType.SYNTH; break; @@ -537,7 +416,7 @@ public class SLSRunner extends Configured implements Tool { setSimulationParams(tempTraceType, inputFiles, tempNodeFile, output, trackedJobSet, cmd.hasOption("printsimulation")); - + start(); return 0; diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java index 32567db666e..a22230f8661 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java @@ -186,7 +186,6 @@ public class NodeInfo { @Override public List pullNewlyIncreasedContainers() { - // TODO Auto-generated method stub return null; } diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java index 26d35ac8972..dbbc88fb52d 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java @@ -176,7 +176,6 @@ public class RMNodeWrapper implements RMNode { @Override public List pullNewlyIncreasedContainers() { - // TODO Auto-generated method stub return Collections.emptyList(); } diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SchedulerMetrics.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SchedulerMetrics.java index 26a9da4cd8b..26fbcd78f39 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SchedulerMetrics.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SchedulerMetrics.java @@ -316,7 +316,7 @@ public abstract class SchedulerMetrics { new Gauge() { @Override public Long getValue() { - if (scheduler.getRootQueueMetrics() == null) { + if (isMetricsAvailable()) { return 0L; } else { return scheduler.getRootQueueMetrics().getAllocatedMB(); @@ -328,7 +328,7 @@ public abstract class SchedulerMetrics { new Gauge() { @Override public Integer getValue() { - if (scheduler.getRootQueueMetrics() == null) { + if (isMetricsAvailable()) { return 0; } else { return scheduler.getRootQueueMetrics().getAllocatedVirtualCores(); @@ -340,7 +340,7 @@ public abstract class SchedulerMetrics { new Gauge() { @Override public Long getValue() { - if (scheduler.getRootQueueMetrics() == null) { + if (isMetricsAvailable()) { return 0L; } else { return scheduler.getRootQueueMetrics().getAvailableMB(); @@ -352,7 +352,7 @@ public abstract class SchedulerMetrics { new Gauge() { @Override public Integer getValue() { - if (scheduler.getRootQueueMetrics() == null) { + if (isMetricsAvailable()) { return 0; } else { return scheduler.getRootQueueMetrics().getAvailableVirtualCores(); @@ -362,6 +362,10 @@ public abstract class SchedulerMetrics { ); } + private boolean isMetricsAvailable() { + return scheduler.getRootQueueMetrics() == null; + } + private void registerContainerAppNumMetrics() { metrics.register("variable.running.application", new Gauge() {