YARN-11103. SLS cleanup after previously merged SLS refactor jiras. Contributed by Szilard Nemeth
This commit is contained in:
parent
e044a46f97
commit
94031b729d
|
@ -53,7 +53,7 @@ import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
|
||||||
public class AMRunner {
|
public class AMRunner {
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(AMRunner.class);
|
private static final Logger LOG = LoggerFactory.getLogger(AMRunner.class);
|
||||||
static int REMAINING_APPS = 0;
|
static int remainingApps = 0;
|
||||||
|
|
||||||
private final Configuration conf;
|
private final Configuration conf;
|
||||||
private int AM_ID;
|
private int AM_ID;
|
||||||
|
@ -63,8 +63,8 @@ public class AMRunner {
|
||||||
private Map<String, Class> amClassMap;
|
private Map<String, Class> amClassMap;
|
||||||
private TraceType inputType;
|
private TraceType inputType;
|
||||||
private String[] inputTraces;
|
private String[] inputTraces;
|
||||||
private TaskRunner runner;
|
private final TaskRunner runner;
|
||||||
private SLSRunner slsRunner;
|
private final SLSRunner slsRunner;
|
||||||
private int numAMs, numTasks;
|
private int numAMs, numTasks;
|
||||||
private long maxRuntime;
|
private long maxRuntime;
|
||||||
private ResourceManager rm;
|
private ResourceManager rm;
|
||||||
|
@ -81,8 +81,8 @@ public class AMRunner {
|
||||||
amClassMap = new HashMap<>();
|
amClassMap = new HashMap<>();
|
||||||
appIdAMSim = new ConcurrentHashMap<>();
|
appIdAMSim = new ConcurrentHashMap<>();
|
||||||
// <AMType, Class> map
|
// <AMType, Class> map
|
||||||
for (Map.Entry e : conf) {
|
for (Map.Entry<String, String> e : conf) {
|
||||||
String key = e.getKey().toString();
|
String key = e.getKey();
|
||||||
if (key.startsWith(SLSConfiguration.AM_TYPE_PREFIX)) {
|
if (key.startsWith(SLSConfiguration.AM_TYPE_PREFIX)) {
|
||||||
String amType = key.substring(SLSConfiguration.AM_TYPE_PREFIX.length());
|
String amType = key.substring(SLSConfiguration.AM_TYPE_PREFIX.length());
|
||||||
amClassMap.put(amType, Class.forName(conf.get(key)));
|
amClassMap.put(amType, Class.forName(conf.get(key)));
|
||||||
|
@ -112,7 +112,7 @@ public class AMRunner {
|
||||||
}
|
}
|
||||||
|
|
||||||
numAMs = amMap.size();
|
numAMs = amMap.size();
|
||||||
REMAINING_APPS = numAMs;
|
remainingApps = numAMs;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -70,9 +70,7 @@ public final class ReservationClientUtil {
|
||||||
deadline, reservationRequests, name);
|
deadline, reservationRequests, name);
|
||||||
|
|
||||||
// outermost request
|
// outermost request
|
||||||
ReservationSubmissionRequest request = ReservationSubmissionRequest
|
return ReservationSubmissionRequest
|
||||||
.newInstance(resDef, queueName, reservationId);
|
.newInstance(resDef, queueName, reservationId);
|
||||||
|
|
||||||
return request;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -25,6 +25,7 @@ import java.io.InputStreamReader;
|
||||||
import java.io.OutputStreamWriter;
|
import java.io.OutputStreamWriter;
|
||||||
import java.io.Reader;
|
import java.io.Reader;
|
||||||
import java.io.Writer;
|
import java.io.Writer;
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.LinkedHashMap;
|
import java.util.LinkedHashMap;
|
||||||
|
@ -54,7 +55,7 @@ public class RumenToSLSConverter {
|
||||||
private static Map<String, Set<String>> rackNodeMap =
|
private static Map<String, Set<String>> rackNodeMap =
|
||||||
new TreeMap<String, Set<String>>();
|
new TreeMap<String, Set<String>>();
|
||||||
|
|
||||||
public static void main(String args[]) throws Exception {
|
public static void main(String[] args) throws Exception {
|
||||||
Options options = new Options();
|
Options options = new Options();
|
||||||
options.addOption("input", true, "input rumen json file");
|
options.addOption("input", true, "input rumen json file");
|
||||||
options.addOption("outputJobs", true, "output jobs file");
|
options.addOption("outputJobs", true, "output jobs file");
|
||||||
|
@ -121,9 +122,10 @@ public class RumenToSLSConverter {
|
||||||
private static void generateSLSLoadFile(String inputFile, String outputFile)
|
private static void generateSLSLoadFile(String inputFile, String outputFile)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
try (Reader input =
|
try (Reader input =
|
||||||
new InputStreamReader(new FileInputStream(inputFile), "UTF-8")) {
|
new InputStreamReader(new FileInputStream(inputFile),
|
||||||
|
StandardCharsets.UTF_8)) {
|
||||||
try (Writer output =
|
try (Writer output =
|
||||||
new OutputStreamWriter(new FileOutputStream(outputFile), "UTF-8")) {
|
new OutputStreamWriter(new FileOutputStream(outputFile), StandardCharsets.UTF_8)) {
|
||||||
ObjectMapper mapper = new ObjectMapper();
|
ObjectMapper mapper = new ObjectMapper();
|
||||||
ObjectWriter writer = mapper.writerWithDefaultPrettyPrinter();
|
ObjectWriter writer = mapper.writerWithDefaultPrettyPrinter();
|
||||||
Iterator<Map> i = mapper.readValues(
|
Iterator<Map> i = mapper.readValues(
|
||||||
|
@ -140,7 +142,7 @@ public class RumenToSLSConverter {
|
||||||
private static void generateSLSNodeFile(String outputFile)
|
private static void generateSLSNodeFile(String outputFile)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
try (Writer output =
|
try (Writer output =
|
||||||
new OutputStreamWriter(new FileOutputStream(outputFile), "UTF-8")) {
|
new OutputStreamWriter(new FileOutputStream(outputFile), StandardCharsets.UTF_8)) {
|
||||||
ObjectMapper mapper = new ObjectMapper();
|
ObjectMapper mapper = new ObjectMapper();
|
||||||
ObjectWriter writer = mapper.writerWithDefaultPrettyPrinter();
|
ObjectWriter writer = mapper.writerWithDefaultPrettyPrinter();
|
||||||
for (Map.Entry<String, Set<String>> entry : rackNodeMap.entrySet()) {
|
for (Map.Entry<String, Set<String>> entry : rackNodeMap.entrySet()) {
|
||||||
|
@ -218,7 +220,7 @@ public class RumenToSLSConverter {
|
||||||
task.put("container.priority", priority);
|
task.put("container.priority", priority);
|
||||||
task.put("container.type", taskType);
|
task.put("container.type", taskType);
|
||||||
array.add(task);
|
array.add(task);
|
||||||
String rackHost[] = SLSUtils.getRackHostName(hostname);
|
String[] rackHost = SLSUtils.getRackHostName(hostname);
|
||||||
if (rackNodeMap.containsKey(rackHost[0])) {
|
if (rackNodeMap.containsKey(rackHost[0])) {
|
||||||
rackNodeMap.get(rackHost[0]).add(rackHost[1]);
|
rackNodeMap.get(rackHost[0]).add(rackHost[1]);
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -18,16 +18,11 @@
|
||||||
package org.apache.hadoop.yarn.sls;
|
package org.apache.hadoop.yarn.sls;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.FileInputStream;
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStreamReader;
|
|
||||||
import java.io.Reader;
|
|
||||||
import java.nio.charset.StandardCharsets;
|
|
||||||
import java.security.Security;
|
import java.security.Security;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.Iterator;
|
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
|
@ -38,10 +33,6 @@ import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import com.fasterxml.jackson.core.JsonFactory;
|
|
||||||
import com.fasterxml.jackson.databind.JavaType;
|
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
|
||||||
|
|
||||||
import org.apache.commons.cli.CommandLine;
|
import org.apache.commons.cli.CommandLine;
|
||||||
import org.apache.commons.cli.CommandLineParser;
|
import org.apache.commons.cli.CommandLineParser;
|
||||||
import org.apache.commons.cli.GnuParser;
|
import org.apache.commons.cli.GnuParser;
|
||||||
|
@ -73,20 +64,10 @@ import org.apache.hadoop.yarn.util.resource.Resources;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import java.io.File;
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.security.Security;
|
|
||||||
import java.util.Arrays;
|
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.HashSet;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.Set;
|
|
||||||
|
|
||||||
@Private
|
@Private
|
||||||
@Unstable
|
@Unstable
|
||||||
public class SLSRunner extends Configured implements Tool {
|
public class SLSRunner extends Configured implements Tool {
|
||||||
private static TaskRunner runner = new TaskRunner();
|
private static final TaskRunner runner = new TaskRunner();
|
||||||
private String[] inputTraces;
|
private String[] inputTraces;
|
||||||
|
|
||||||
// metrics
|
// metrics
|
||||||
|
@ -103,6 +84,7 @@ public class SLSRunner extends Configured implements Tool {
|
||||||
private RMRunner rmRunner;
|
private RMRunner rmRunner;
|
||||||
private NMRunner nmRunner;
|
private NMRunner nmRunner;
|
||||||
|
|
||||||
|
private TraceType inputType;
|
||||||
private SynthTraceJobProducer stjp;
|
private SynthTraceJobProducer stjp;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -117,7 +99,7 @@ public class SLSRunner extends Configured implements Tool {
|
||||||
"networkaddress.cache.negative.ttl";
|
"networkaddress.cache.negative.ttl";
|
||||||
|
|
||||||
public static int getRemainingApps() {
|
public static int getRemainingApps() {
|
||||||
return AMRunner.REMAINING_APPS;
|
return AMRunner.remainingApps;
|
||||||
}
|
}
|
||||||
|
|
||||||
public SLSRunner() throws ClassNotFoundException, YarnException {
|
public SLSRunner() throws ClassNotFoundException, YarnException {
|
||||||
|
@ -175,31 +157,32 @@ public class SLSRunner extends Configured implements Tool {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This is invoked before start.
|
* This is invoked before start.
|
||||||
* @param inType
|
* @param inputType The trace type
|
||||||
* @param inTraces
|
* @param inTraces Input traces
|
||||||
* @param nodes
|
* @param nodes The node file
|
||||||
* @param metricsOutputDir
|
* @param metricsOutputDir Output dir for metrics
|
||||||
* @param trackApps
|
* @param trackApps Track these applications
|
||||||
* @param printsimulation
|
* @param printSimulation Whether to print the simulation
|
||||||
*/
|
*/
|
||||||
public void setSimulationParams(TraceType inType, String[] inTraces,
|
public void setSimulationParams(TraceType inputType, String[] inTraces,
|
||||||
String nodes, String metricsOutputDir, Set<String> trackApps,
|
String nodes, String metricsOutputDir, Set<String> trackApps,
|
||||||
boolean printsimulation) throws YarnException {
|
boolean printSimulation) throws YarnException {
|
||||||
|
this.inputType = inputType;
|
||||||
this.inputTraces = inTraces.clone();
|
this.inputTraces = inTraces.clone();
|
||||||
this.amRunner.setInputType(inType);
|
this.amRunner.setInputType(inputType);
|
||||||
this.amRunner.setInputTraces(this.inputTraces);
|
this.amRunner.setInputTraces(this.inputTraces);
|
||||||
this.amRunner.setTrackedApps(trackApps);
|
this.amRunner.setTrackedApps(trackApps);
|
||||||
this.nmRunner.setNodeFile(nodes);
|
this.nmRunner.setNodeFile(nodes);
|
||||||
this.nmRunner.setInputType(inType);
|
this.nmRunner.setInputType(inputType);
|
||||||
this.nmRunner.setInputTraces(this.inputTraces);
|
this.nmRunner.setInputTraces(this.inputTraces);
|
||||||
this.printSimulation = printsimulation;
|
this.printSimulation = printSimulation;
|
||||||
this.rmRunner.setMetricsOutputDir(metricsOutputDir);
|
this.rmRunner.setMetricsOutputDir(metricsOutputDir);
|
||||||
String tableMapping = metricsOutputDir + "/tableMapping.csv";
|
String tableMapping = metricsOutputDir + "/tableMapping.csv";
|
||||||
this.rmRunner.setTableMapping(tableMapping);
|
this.rmRunner.setTableMapping(tableMapping);
|
||||||
this.nmRunner.setTableMapping(tableMapping);
|
this.nmRunner.setTableMapping(tableMapping);
|
||||||
|
|
||||||
//We need this.inputTraces to set before creating SynthTraceJobProducer
|
//We need this.inputTraces to set before creating SynthTraceJobProducer
|
||||||
if (inType == TraceType.SYNTH) {
|
if (inputType == TraceType.SYNTH) {
|
||||||
this.stjp = getSynthJobTraceProducer();
|
this.stjp = getSynthJobTraceProducer();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -319,8 +302,8 @@ public class SLSRunner extends Configured implements Tool {
|
||||||
}
|
}
|
||||||
|
|
||||||
public static void decreaseRemainingApps() {
|
public static void decreaseRemainingApps() {
|
||||||
AMRunner.REMAINING_APPS--;
|
AMRunner.remainingApps--;
|
||||||
if (AMRunner.REMAINING_APPS == 0) {
|
if (AMRunner.remainingApps == 0) {
|
||||||
exitSLSRunner();
|
exitSLSRunner();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -359,24 +342,15 @@ public class SLSRunner extends Configured implements Tool {
|
||||||
CommandLineParser parser = new GnuParser();
|
CommandLineParser parser = new GnuParser();
|
||||||
CommandLine cmd = parser.parse(options, argv);
|
CommandLine cmd = parser.parse(options, argv);
|
||||||
|
|
||||||
String traceType = null;
|
|
||||||
String traceLocation = null;
|
|
||||||
|
|
||||||
// compatibility with old commandline
|
// compatibility with old commandline
|
||||||
if (cmd.hasOption("inputrumen")) {
|
boolean hasInputRumenOption = cmd.hasOption("inputrumen");
|
||||||
traceType = "RUMEN";
|
boolean hasInputSlsOption = cmd.hasOption("inputsls");
|
||||||
traceLocation = cmd.getOptionValue("inputrumen");
|
boolean hasTraceTypeOption = cmd.hasOption("tracetype");
|
||||||
}
|
TraceType traceType = determineTraceType(cmd, hasInputRumenOption,
|
||||||
if (cmd.hasOption("inputsls")) {
|
hasInputSlsOption, hasTraceTypeOption);
|
||||||
traceType = "SLS";
|
String traceLocation = determineTraceLocation(cmd, hasInputRumenOption,
|
||||||
traceLocation = cmd.getOptionValue("inputsls");
|
hasInputSlsOption, hasTraceTypeOption);
|
||||||
}
|
|
||||||
|
|
||||||
if (cmd.hasOption("tracetype")) {
|
|
||||||
traceType = cmd.getOptionValue("tracetype");
|
|
||||||
traceLocation = cmd.getOptionValue("tracelocation");
|
|
||||||
}
|
|
||||||
|
|
||||||
String output = cmd.getOptionValue("output");
|
String output = cmd.getOptionValue("output");
|
||||||
|
|
||||||
File outputFile = new File(output);
|
File outputFile = new File(output);
|
||||||
|
@ -396,25 +370,9 @@ public class SLSRunner extends Configured implements Tool {
|
||||||
String tempNodeFile =
|
String tempNodeFile =
|
||||||
cmd.hasOption("nodes") ? cmd.getOptionValue("nodes") : "";
|
cmd.hasOption("nodes") ? cmd.getOptionValue("nodes") : "";
|
||||||
|
|
||||||
TraceType tempTraceType;
|
|
||||||
switch (traceType) {
|
|
||||||
case "SLS":
|
|
||||||
tempTraceType = TraceType.SLS;
|
|
||||||
break;
|
|
||||||
case "RUMEN":
|
|
||||||
tempTraceType = TraceType.RUMEN;
|
|
||||||
break;
|
|
||||||
case "SYNTH":
|
|
||||||
tempTraceType = TraceType.SYNTH;
|
|
||||||
break;
|
|
||||||
default:
|
|
||||||
printUsage();
|
|
||||||
throw new YarnException("Misconfigured input");
|
|
||||||
}
|
|
||||||
|
|
||||||
String[] inputFiles = traceLocation.split(",");
|
String[] inputFiles = traceLocation.split(",");
|
||||||
|
|
||||||
setSimulationParams(tempTraceType, inputFiles, tempNodeFile, output,
|
setSimulationParams(traceType, inputFiles, tempNodeFile, output,
|
||||||
trackedJobSet, cmd.hasOption("printsimulation"));
|
trackedJobSet, cmd.hasOption("printsimulation"));
|
||||||
|
|
||||||
start();
|
start();
|
||||||
|
@ -422,6 +380,48 @@ public class SLSRunner extends Configured implements Tool {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private TraceType determineTraceType(CommandLine cmd, boolean hasInputRumenOption,
|
||||||
|
boolean hasInputSlsOption, boolean hasTraceTypeOption) throws YarnException {
|
||||||
|
String traceType = null;
|
||||||
|
if (hasInputRumenOption) {
|
||||||
|
traceType = "RUMEN";
|
||||||
|
}
|
||||||
|
if (hasInputSlsOption) {
|
||||||
|
traceType = "SLS";
|
||||||
|
}
|
||||||
|
if (hasTraceTypeOption) {
|
||||||
|
traceType = cmd.getOptionValue("tracetype");
|
||||||
|
}
|
||||||
|
if (traceType == null) {
|
||||||
|
throw new YarnException("Misconfigured input");
|
||||||
|
}
|
||||||
|
switch (traceType) {
|
||||||
|
case "SLS":
|
||||||
|
return TraceType.SLS;
|
||||||
|
case "RUMEN":
|
||||||
|
return TraceType.RUMEN;
|
||||||
|
case "SYNTH":
|
||||||
|
return TraceType.SYNTH;
|
||||||
|
default:
|
||||||
|
printUsage();
|
||||||
|
throw new YarnException("Misconfigured input");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private String determineTraceLocation(CommandLine cmd, boolean hasInputRumenOption,
|
||||||
|
boolean hasInputSlsOption, boolean hasTraceTypeOption) throws YarnException {
|
||||||
|
if (hasInputRumenOption) {
|
||||||
|
return cmd.getOptionValue("inputrumen");
|
||||||
|
}
|
||||||
|
if (hasInputSlsOption) {
|
||||||
|
return cmd.getOptionValue("inputsls");
|
||||||
|
}
|
||||||
|
if (hasTraceTypeOption) {
|
||||||
|
return cmd.getOptionValue("tracelocation");
|
||||||
|
}
|
||||||
|
throw new YarnException("Misconfigured input! ");
|
||||||
|
}
|
||||||
|
|
||||||
public static void main(String[] argv) throws Exception {
|
public static void main(String[] argv) throws Exception {
|
||||||
exitAtTheFinish = true;
|
exitAtTheFinish = true;
|
||||||
ToolRunner.run(new Configuration(), new SLSRunner(), argv);
|
ToolRunner.run(new Configuration(), new SLSRunner(), argv);
|
||||||
|
|
|
@ -85,7 +85,7 @@ public class NMSimulator extends TaskRunner.Task {
|
||||||
super.init(dispatchTime, dispatchTime + 1000000L * heartBeatInterval,
|
super.init(dispatchTime, dispatchTime + 1000000L * heartBeatInterval,
|
||||||
heartBeatInterval);
|
heartBeatInterval);
|
||||||
// create resource
|
// create resource
|
||||||
String rackHostName[] = SLSUtils.getRackHostName(nodeIdStr);
|
String[] rackHostName = SLSUtils.getRackHostName(nodeIdStr);
|
||||||
this.node = NodeInfo.newNodeInfo(rackHostName[0], rackHostName[1],
|
this.node = NodeInfo.newNodeInfo(rackHostName[0], rackHostName[1],
|
||||||
Resources.clone(nodeResource));
|
Resources.clone(nodeResource));
|
||||||
this.rm = pRm;
|
this.rm = pRm;
|
||||||
|
@ -128,7 +128,7 @@ public class NMSimulator extends TaskRunner.Task {
|
||||||
@Override
|
@Override
|
||||||
public void middleStep() throws Exception {
|
public void middleStep() throws Exception {
|
||||||
// we check the lifetime for each running containers
|
// we check the lifetime for each running containers
|
||||||
ContainerSimulator cs = null;
|
ContainerSimulator cs;
|
||||||
synchronized(completedContainerList) {
|
synchronized(completedContainerList) {
|
||||||
while ((cs = containerQueue.poll()) != null) {
|
while ((cs = containerQueue.poll()) != null) {
|
||||||
runningContainers.remove(cs.getId());
|
runningContainers.remove(cs.getId());
|
||||||
|
|
|
@ -258,9 +258,8 @@ public class NodeInfo {
|
||||||
final Resource resource, int port) {
|
final Resource resource, int port) {
|
||||||
final NodeId nodeId = newNodeID(hostName, port);
|
final NodeId nodeId = newNodeID(hostName, port);
|
||||||
final String nodeAddr = hostName + ":" + port;
|
final String nodeAddr = hostName + ":" + port;
|
||||||
final String httpAddress = hostName;
|
|
||||||
|
return new FakeRMNodeImpl(nodeId, nodeAddr, hostName,
|
||||||
return new FakeRMNodeImpl(nodeId, nodeAddr, httpAddress,
|
|
||||||
resource, rackName, "Me good",
|
resource, rackName, "Me good",
|
||||||
port, hostName, null);
|
port, hostName, null);
|
||||||
}
|
}
|
||||||
|
|
|
@ -37,7 +37,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptI
|
||||||
import org.apache.hadoop.yarn.sls.SLSRunner;
|
import org.apache.hadoop.yarn.sls.SLSRunner;
|
||||||
import org.apache.hadoop.yarn.sls.appmaster.AMSimulator;
|
import org.apache.hadoop.yarn.sls.appmaster.AMSimulator;
|
||||||
|
|
||||||
import java.util.Map;
|
|
||||||
|
|
||||||
public class MockAMLauncher extends ApplicationMasterLauncher
|
public class MockAMLauncher extends ApplicationMasterLauncher
|
||||||
implements EventHandler<AMLauncherEvent> {
|
implements EventHandler<AMLauncherEvent> {
|
||||||
|
|
|
@ -61,7 +61,7 @@ public class SLSSchedulerCommons {
|
||||||
private final Map<ApplicationAttemptId, String> appQueueMap = new ConcurrentHashMap<>();
|
private final Map<ApplicationAttemptId, String> appQueueMap = new ConcurrentHashMap<>();
|
||||||
private final Tracker tracker;
|
private final Tracker tracker;
|
||||||
|
|
||||||
public SLSSchedulerCommons(AbstractYarnScheduler scheduler) {
|
public SLSSchedulerCommons(AbstractYarnScheduler<?, ?> scheduler) {
|
||||||
this.scheduler = scheduler;
|
this.scheduler = scheduler;
|
||||||
this.tracker = new Tracker();
|
this.tracker = new Tracker();
|
||||||
}
|
}
|
||||||
|
@ -174,7 +174,7 @@ public class SLSSchedulerCommons {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// containers released/preemption from scheduler
|
// containers released/preemption from scheduler
|
||||||
Set<ContainerId> preemptionContainers = new HashSet<ContainerId>();
|
Set<ContainerId> preemptionContainers = new HashSet<>();
|
||||||
if (allocation.getContainerPreemptions() != null) {
|
if (allocation.getContainerPreemptions() != null) {
|
||||||
preemptionContainers.addAll(allocation.getContainerPreemptions());
|
preemptionContainers.addAll(allocation.getContainerPreemptions());
|
||||||
}
|
}
|
||||||
|
@ -277,7 +277,7 @@ public class SLSSchedulerCommons {
|
||||||
AppAttemptAddedSchedulerEvent appAddEvent =
|
AppAttemptAddedSchedulerEvent appAddEvent =
|
||||||
(AppAttemptAddedSchedulerEvent) schedulerEvent;
|
(AppAttemptAddedSchedulerEvent) schedulerEvent;
|
||||||
SchedulerApplication app =
|
SchedulerApplication app =
|
||||||
(SchedulerApplication) scheduler.getSchedulerApplications()
|
scheduler.getSchedulerApplications()
|
||||||
.get(appAddEvent.getApplicationAttemptId().getApplicationId());
|
.get(appAddEvent.getApplicationAttemptId().getApplicationId());
|
||||||
appQueueMap.put(appAddEvent.getApplicationAttemptId(), app.getQueue()
|
appQueueMap.put(appAddEvent.getApplicationAttemptId(), app.getQueue()
|
||||||
.getQueueName());
|
.getQueueName());
|
||||||
|
|
|
@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.sls.scheduler;
|
||||||
|
|
||||||
import java.io.BufferedWriter;
|
import java.io.BufferedWriter;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
@ -184,7 +185,7 @@ public abstract class SchedulerMetrics {
|
||||||
// application running information
|
// application running information
|
||||||
jobRuntimeLogBW =
|
jobRuntimeLogBW =
|
||||||
new BufferedWriter(new OutputStreamWriter(new FileOutputStream(
|
new BufferedWriter(new OutputStreamWriter(new FileOutputStream(
|
||||||
metricsOutputDir + "/jobruntime.csv"), "UTF-8"));
|
metricsOutputDir + "/jobruntime.csv"), StandardCharsets.UTF_8));
|
||||||
jobRuntimeLogBW.write("JobID,real_start_time,real_end_time," +
|
jobRuntimeLogBW.write("JobID,real_start_time,real_end_time," +
|
||||||
"simulate_start_time,simulate_end_time" + EOL);
|
"simulate_start_time,simulate_end_time" + EOL);
|
||||||
jobRuntimeLogBW.flush();
|
jobRuntimeLogBW.flush();
|
||||||
|
@ -560,7 +561,7 @@ public abstract class SchedulerMetrics {
|
||||||
try {
|
try {
|
||||||
metricsLogBW =
|
metricsLogBW =
|
||||||
new BufferedWriter(new OutputStreamWriter(new FileOutputStream(
|
new BufferedWriter(new OutputStreamWriter(new FileOutputStream(
|
||||||
metricsOutputDir + "/realtimetrack.json"), "UTF-8"));
|
metricsOutputDir + "/realtimetrack.json"), StandardCharsets.UTF_8));
|
||||||
metricsLogBW.write("[");
|
metricsLogBW.write("[");
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
LOG.info(e.getMessage());
|
LOG.info(e.getMessage());
|
||||||
|
@ -717,11 +718,10 @@ public abstract class SchedulerMetrics {
|
||||||
long traceEndTimeMS, long simulateStartTimeMS, long simulateEndTimeMS) {
|
long traceEndTimeMS, long simulateStartTimeMS, long simulateEndTimeMS) {
|
||||||
try {
|
try {
|
||||||
// write job runtime information
|
// write job runtime information
|
||||||
StringBuilder sb = new StringBuilder();
|
String runtimeInfo = appId + "," + traceStartTimeMS + "," +
|
||||||
sb.append(appId).append(",").append(traceStartTimeMS).append(",")
|
traceEndTimeMS + "," + simulateStartTimeMS +
|
||||||
.append(traceEndTimeMS).append(",").append(simulateStartTimeMS)
|
"," + simulateEndTimeMS;
|
||||||
.append(",").append(simulateEndTimeMS);
|
jobRuntimeLogBW.write(runtimeInfo + EOL);
|
||||||
jobRuntimeLogBW.write(sb.toString() + EOL);
|
|
||||||
jobRuntimeLogBW.flush();
|
jobRuntimeLogBW.flush();
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
LOG.info(e.getMessage());
|
LOG.info(e.getMessage());
|
||||||
|
|
|
@ -184,9 +184,9 @@ public class SynthJob implements JobStory {
|
||||||
int num = task.count.getInt();
|
int num = task.count.getInt();
|
||||||
String taskType = task.type;
|
String taskType = task.type;
|
||||||
long memory = task.max_memory.getLong();
|
long memory = task.max_memory.getLong();
|
||||||
memory = memory < MIN_MEMORY ? MIN_MEMORY: memory;
|
memory = Math.max(memory, MIN_MEMORY);
|
||||||
long vcores = task.max_vcores.getLong();
|
long vcores = task.max_vcores.getLong();
|
||||||
vcores = vcores < MIN_VCORES ? MIN_VCORES : vcores;
|
vcores = Math.max(vcores, MIN_VCORES);
|
||||||
int priority = task.priority;
|
int priority = task.priority;
|
||||||
ExecutionType executionType = task.executionType == null
|
ExecutionType executionType = task.executionType == null
|
||||||
? ExecutionType.GUARANTEED
|
? ExecutionType.GUARANTEED
|
||||||
|
|
|
@ -310,7 +310,6 @@ public class SynthTraceJobProducer implements JobStoryProducer {
|
||||||
|
|
||||||
// Initialize job weights
|
// Initialize job weights
|
||||||
job_weights = new ArrayList<>();
|
job_weights = new ArrayList<>();
|
||||||
job_weights = new ArrayList<>();
|
|
||||||
for(JobDefinition j : job_classes){
|
for(JobDefinition j : job_classes){
|
||||||
job_weights.add(j.class_weight);
|
job_weights.add(j.class_weight);
|
||||||
}
|
}
|
||||||
|
@ -638,7 +637,7 @@ public class SynthTraceJobProducer implements JobStoryProducer {
|
||||||
public String toString(){
|
public String toString(){
|
||||||
switch(mode){
|
switch(mode){
|
||||||
case CONST:
|
case CONST:
|
||||||
return "value: " + Double.toString(val);
|
return "value: " + val;
|
||||||
case DIST:
|
case DIST:
|
||||||
return "value: " + this.val + " std: " + this.std + " dist: "
|
return "value: " + this.val + " std: " + this.std + " dist: "
|
||||||
+ this.dist.name();
|
+ this.dist.name();
|
||||||
|
|
|
@ -85,7 +85,7 @@ public class SLSUtils {
|
||||||
JobTraceReader reader = new JobTraceReader(
|
JobTraceReader reader = new JobTraceReader(
|
||||||
new Path(fin.getAbsolutePath()), conf);
|
new Path(fin.getAbsolutePath()), conf);
|
||||||
try {
|
try {
|
||||||
LoggedJob job = null;
|
LoggedJob job;
|
||||||
while ((job = reader.getNext()) != null) {
|
while ((job = reader.getNext()) != null) {
|
||||||
for(LoggedTask mapTask : job.getMapTasks()) {
|
for(LoggedTask mapTask : job.getMapTasks()) {
|
||||||
// select the last attempt
|
// select the last attempt
|
||||||
|
@ -123,7 +123,7 @@ public class SLSUtils {
|
||||||
JsonFactory jsonF = new JsonFactory();
|
JsonFactory jsonF = new JsonFactory();
|
||||||
ObjectMapper mapper = new ObjectMapper();
|
ObjectMapper mapper = new ObjectMapper();
|
||||||
Reader input =
|
Reader input =
|
||||||
new InputStreamReader(new FileInputStream(jobTrace), "UTF-8");
|
new InputStreamReader(new FileInputStream(jobTrace), StandardCharsets.UTF_8);
|
||||||
try {
|
try {
|
||||||
Iterator<Map> i = mapper.readValues(jsonF.createParser(input), Map.class);
|
Iterator<Map> i = mapper.readValues(jsonF.createParser(input), Map.class);
|
||||||
while (i.hasNext()) {
|
while (i.hasNext()) {
|
||||||
|
@ -170,7 +170,7 @@ public class SLSUtils {
|
||||||
JsonFactory jsonF = new JsonFactory();
|
JsonFactory jsonF = new JsonFactory();
|
||||||
ObjectMapper mapper = new ObjectMapper();
|
ObjectMapper mapper = new ObjectMapper();
|
||||||
Reader input =
|
Reader input =
|
||||||
new InputStreamReader(new FileInputStream(nodeFile), "UTF-8");
|
new InputStreamReader(new FileInputStream(nodeFile), StandardCharsets.UTF_8);
|
||||||
try {
|
try {
|
||||||
Iterator<Map> i = mapper.readValues(jsonF.createParser(input), Map.class);
|
Iterator<Map> i = mapper.readValues(jsonF.createParser(input), Map.class);
|
||||||
while (i.hasNext()) {
|
while (i.hasNext()) {
|
||||||
|
|
|
@ -103,7 +103,7 @@ public abstract class BaseSLSRunnerTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (nodeFile != null) {
|
if (nodeFile != null) {
|
||||||
args = ArrayUtils.addAll(args, new String[] {"-nodes", nodeFile });
|
args = ArrayUtils.addAll(args, "-nodes", nodeFile);
|
||||||
}
|
}
|
||||||
|
|
||||||
// enable continuous invariant checks
|
// enable continuous invariant checks
|
||||||
|
|
|
@ -37,6 +37,7 @@ import java.util.Arrays;
|
||||||
|
|
||||||
import static com.fasterxml.jackson.databind.DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES;
|
import static com.fasterxml.jackson.databind.DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertNotNull;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -88,7 +89,7 @@ public class TestSynthJobGeneration {
|
||||||
}
|
}
|
||||||
|
|
||||||
Assert.assertTrue(bucket0 > 0);
|
Assert.assertTrue(bucket0 > 0);
|
||||||
Assert.assertTrue(bucket1 == 0);
|
assertEquals(0, bucket1);
|
||||||
Assert.assertTrue(bucket2 > 0);
|
Assert.assertTrue(bucket2 > 0);
|
||||||
Assert.assertTrue(bucket3 > 0);
|
Assert.assertTrue(bucket3 > 0);
|
||||||
Assert.assertTrue(bucket2 > bucket0);
|
Assert.assertTrue(bucket2 > bucket0);
|
||||||
|
@ -255,7 +256,7 @@ public class TestSynthJobGeneration {
|
||||||
assertTrue(js.getTasks().size() > 0);
|
assertTrue(js.getTasks().size() > 0);
|
||||||
|
|
||||||
for (SynthJob.SynthTask t : js.getTasks()) {
|
for (SynthJob.SynthTask t : js.getTasks()) {
|
||||||
assertTrue(t.getType() != null);
|
assertNotNull(t.getType());
|
||||||
assertTrue(t.getTime() > 0);
|
assertTrue(t.getTime() > 0);
|
||||||
assertTrue(t.getMemory() > 0);
|
assertTrue(t.getMemory() > 0);
|
||||||
assertTrue(t.getVcores() > 0);
|
assertTrue(t.getVcores() > 0);
|
||||||
|
|
|
@ -36,7 +36,7 @@ public class TestSLSUtils {
|
||||||
@Test
|
@Test
|
||||||
public void testGetRackHostname() {
|
public void testGetRackHostname() {
|
||||||
String str = "/rack1/node1";
|
String str = "/rack1/node1";
|
||||||
String rackHostname[] = SLSUtils.getRackHostName(str);
|
String[] rackHostname = SLSUtils.getRackHostName(str);
|
||||||
Assert.assertEquals("rack1", rackHostname[0]);
|
Assert.assertEquals("rack1", rackHostname[0]);
|
||||||
Assert.assertEquals("node1", rackHostname[1]);
|
Assert.assertEquals("node1", rackHostname[1]);
|
||||||
|
|
||||||
|
@ -81,7 +81,7 @@ public class TestSLSUtils {
|
||||||
} else if(nodeLabel.getName().equals("label2")) {
|
} else if(nodeLabel.getName().equals("label2")) {
|
||||||
Assert.assertFalse(nodeLabel.isExclusive());
|
Assert.assertFalse(nodeLabel.isExclusive());
|
||||||
} else {
|
} else {
|
||||||
Assert.assertTrue("Unexepected label", false);
|
Assert.fail("Unexpected label");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else if (nodeDetail.getHostname().equals("/rack1/node4")) {
|
} else if (nodeDetail.getHostname().equals("/rack1/node4")) {
|
||||||
|
|
Loading…
Reference in New Issue