diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 827d96fd714..f05ccc6f05c 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -93,6 +93,8 @@ Release 2.0.5-beta - UNRELEASED MAPREDUCE-5077. Remove mapreduce.util.ResourceCalculatorPlugin and related code. (Karthik Kambatla via sseth) + MAPREDUCE-3008. Improvements to cumulative CPU emulation for short running + tasks in Gridmix. (amarrk via tgraves) Release 2.0.4-alpha - UNRELEASED diff --git a/hadoop-tools/hadoop-gridmix/src/main/java/org/apache/hadoop/mapred/gridmix/LoadJob.java b/hadoop-tools/hadoop-gridmix/src/main/java/org/apache/hadoop/mapred/gridmix/LoadJob.java index f52c1e41d75..3daf4abbe78 100644 --- a/hadoop-tools/hadoop-gridmix/src/main/java/org/apache/hadoop/mapred/gridmix/LoadJob.java +++ b/hadoop-tools/hadoop-gridmix/src/main/java/org/apache/hadoop/mapred/gridmix/LoadJob.java @@ -32,6 +32,7 @@ import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskCounter; import org.apache.hadoop.mapreduce.TaskInputOutputContext; import org.apache.hadoop.mapreduce.TaskType; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; @@ -72,7 +73,7 @@ class LoadJob extends GridmixJob { job.setNumReduceTasks(jobdesc.getNumberReduces()); job.setMapOutputKeyClass(GridmixKey.class); job.setMapOutputValueClass(GridmixRecord.class); - job.setSortComparatorClass(GridmixKey.Comparator.class); + job.setSortComparatorClass(LoadSortComparator.class); job.setGroupingComparatorClass(SpecGroupingComparator.class); job.setInputFormatClass(LoadInputFormat.class); job.setOutputFormatClass(RawBytesOutputFormat.class); @@ -93,18 +94,85 @@ class LoadJob extends GridmixJob { return true; } + /** + * This is a load matching key comparator which will make sure that the + * resource usage load is matched even when the framework is in control. + */ + public static class LoadSortComparator extends GridmixKey.Comparator { + private ResourceUsageMatcherRunner matcher = null; + private boolean isConfigured = false; + + public LoadSortComparator() { + super(); + } + + @Override + public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { + configure(); + int ret = super.compare(b1, s1, l1, b2, s2, l2); + if (matcher != null) { + try { + matcher.match(); // match the resource usage now + } catch (Exception e) {} + } + return ret; + } + + //TODO Note that the sorter will be instantiated 2 times as follows + // 1. During the sort/spill in the map phase + // 2. During the merge in the sort phase + // We need the handle to the matcher thread only in (2). + // This logic can be relaxed to run only in (2). + private void configure() { + if (!isConfigured) { + ThreadGroup group = Thread.currentThread().getThreadGroup(); + Thread[] threads = new Thread[group.activeCount() * 2]; + group.enumerate(threads, true); + for (Thread t : threads) { + if (t != null && (t instanceof ResourceUsageMatcherRunner)) { + this.matcher = (ResourceUsageMatcherRunner) t; + isConfigured = true; + break; + } + } + } + } + } + /** * This is a progress based resource usage matcher. */ @SuppressWarnings("unchecked") - static class ResourceUsageMatcherRunner extends Thread { + static class ResourceUsageMatcherRunner extends Thread + implements Progressive { private final ResourceUsageMatcher matcher; - private final Progressive progress; + private final BoostingProgress progress; private final long sleepTime; private static final String SLEEP_CONFIG = "gridmix.emulators.resource-usage.sleep-duration"; private static final long DEFAULT_SLEEP_TIME = 100; // 100ms + /** + * This is a progress bar that can be boosted for weaker use-cases. + */ + private static class BoostingProgress implements Progressive { + private float boostValue = 0f; + TaskInputOutputContext context; + + BoostingProgress(TaskInputOutputContext context) { + this.context = context; + } + + void setBoostValue(float boostValue) { + this.boostValue = boostValue; + } + + @Override + public float getProgress() { + return Math.min(1f, context.getProgress() + boostValue); + } + } + ResourceUsageMatcherRunner(final TaskInputOutputContext context, ResourceUsageMetrics metrics) { Configuration conf = context.getConfiguration(); @@ -118,19 +186,14 @@ class LoadJob extends GridmixJob { // set the other parameters this.sleepTime = conf.getLong(SLEEP_CONFIG, DEFAULT_SLEEP_TIME); - progress = new Progressive() { - @Override - public float getProgress() { - return context.getProgress(); - } - }; + progress = new BoostingProgress(context); // instantiate a resource-usage-matcher matcher = new ResourceUsageMatcher(); matcher.configure(conf, plugin, metrics, progress); } - protected void match() throws Exception { + protected void match() throws IOException, InterruptedException { // match the resource usage matcher.matchResourceUsage(); } @@ -157,21 +220,34 @@ class LoadJob extends GridmixJob { + " thread! Exiting.", e); } } + + @Override + public float getProgress() { + return matcher.getProgress(); + } + + // boost the progress bar as fasten up the emulation cycles. + void boost(float value) { + progress.setBoostValue(value); + } } // Makes sure that the TaskTracker doesn't kill the map/reduce tasks while // they are emulating private static class StatusReporter extends Thread { - private TaskAttemptContext context; - StatusReporter(TaskAttemptContext context) { + private final TaskAttemptContext context; + private final Progressive progress; + + StatusReporter(TaskAttemptContext context, Progressive progress) { this.context = context; + this.progress = progress; } @Override public void run() { LOG.info("Status reporter thread started."); try { - while (context.getProgress() < 1) { + while (!isInterrupted() && progress.getProgress() < 1) { // report progress context.progress(); @@ -278,7 +354,7 @@ class LoadJob extends GridmixJob { matcher.setDaemon(true); // start the status reporter thread - reporter = new StatusReporter(ctxt); + reporter = new StatusReporter(ctxt, matcher); reporter.setDaemon(true); reporter.start(); } @@ -326,6 +402,17 @@ class LoadJob extends GridmixJob { } } + // check if the thread will get a chance to run or not + // check if there will be a sort&spill->merge phase or not + // check if the final sort&spill->merge phase is gonna happen or not + if (context.getNumReduceTasks() > 0 + && context.getCounter(TaskCounter.SPILLED_RECORDS).getValue() == 0) { + LOG.info("Boosting the map phase progress."); + // add the sort phase progress to the map phase and emulate + matcher.boost(0.33f); + matcher.match(); + } + // start the matcher thread since the map phase ends here matcher.start(); } @@ -394,7 +481,7 @@ class LoadJob extends GridmixJob { matcher = new ResourceUsageMatcherRunner(context, metrics); // start the status reporter thread - reporter = new StatusReporter(context); + reporter = new StatusReporter(context, matcher); reporter.start(); } @Override @@ -530,9 +617,13 @@ class LoadJob extends GridmixJob { specRecords[j] = info.getOutputRecords(); metrics[j] = info.getResourceUsageMetrics(); if (LOG.isDebugEnabled()) { - LOG.debug(String.format("SPEC(%d) %d -> %d %d %d", id(), i, + LOG.debug(String.format("SPEC(%d) %d -> %d %d %d %d %d %d %d", id(), i, i + j * maps, info.getOutputRecords(), - info.getOutputBytes())); + info.getOutputBytes(), + info.getResourceUsageMetrics().getCumulativeCpuUsage(), + info.getResourceUsageMetrics().getPhysicalMemoryUsage(), + info.getResourceUsageMetrics().getVirtualMemoryUsage(), + info.getResourceUsageMetrics().getHeapUsage())); } } final TaskInfo info = jobdesc.getTaskInfo(TaskType.MAP, i); diff --git a/hadoop-tools/hadoop-gridmix/src/main/java/org/apache/hadoop/mapred/gridmix/emulators/resourceusage/CumulativeCpuUsageEmulatorPlugin.java b/hadoop-tools/hadoop-gridmix/src/main/java/org/apache/hadoop/mapred/gridmix/emulators/resourceusage/CumulativeCpuUsageEmulatorPlugin.java index 050b177adfa..7ed118e0c88 100644 --- a/hadoop-tools/hadoop-gridmix/src/main/java/org/apache/hadoop/mapred/gridmix/emulators/resourceusage/CumulativeCpuUsageEmulatorPlugin.java +++ b/hadoop-tools/hadoop-gridmix/src/main/java/org/apache/hadoop/mapred/gridmix/emulators/resourceusage/CumulativeCpuUsageEmulatorPlugin.java @@ -67,7 +67,7 @@ implements ResourceUsageEmulatorPlugin { private float emulationInterval; // emulation interval private long targetCpuUsage = 0; private float lastSeenProgress = 0; - private long lastSeenCpuUsageCpuUsage = 0; + private long lastSeenCpuUsage = 0; // Configuration parameters public static final String CPU_EMULATION_PROGRESS_INTERVAL = @@ -229,6 +229,16 @@ implements ResourceUsageEmulatorPlugin { return progress * progress * progress * progress; } + private synchronized long getCurrentCPUUsage() { + return monitor.getCumulativeCpuTime(); + + } + + @Override + public float getProgress() { + return Math.min(1f, ((float)getCurrentCPUUsage())/targetCpuUsage); + } + @Override //TODO Multi-threading for speedup? public void emulate() throws IOException, InterruptedException { @@ -249,10 +259,9 @@ implements ResourceUsageEmulatorPlugin { // Note that (Cc-Cl)/(Pc-Pl) is termed as 'rate' in the following // section - long currentCpuUsage = - monitor.getCumulativeCpuTime(); + long currentCpuUsage = getCurrentCPUUsage(); // estimate the cpu usage rate - float rate = (currentCpuUsage - lastSeenCpuUsageCpuUsage) + float rate = (currentCpuUsage - lastSeenCpuUsage) / (currentProgress - lastSeenProgress); long projectedUsage = currentCpuUsage + (long)((1 - currentProgress) * rate); @@ -264,8 +273,7 @@ implements ResourceUsageEmulatorPlugin { (long)(targetCpuUsage * getWeightForProgressInterval(currentProgress)); - while (monitor.getCumulativeCpuTime() - < currentWeighedTarget) { + while (getCurrentCPUUsage() < currentWeighedTarget) { emulatorCore.compute(); // sleep for 100ms try { @@ -281,8 +289,7 @@ implements ResourceUsageEmulatorPlugin { // set the last seen progress lastSeenProgress = progress.getProgress(); // set the last seen usage - lastSeenCpuUsageCpuUsage = - monitor.getCumulativeCpuTime(); + lastSeenCpuUsage = getCurrentCPUUsage(); } } } @@ -310,6 +317,6 @@ implements ResourceUsageEmulatorPlugin { // initialize the states lastSeenProgress = 0; - lastSeenCpuUsageCpuUsage = 0; + lastSeenCpuUsage = 0; } -} \ No newline at end of file +} diff --git a/hadoop-tools/hadoop-gridmix/src/main/java/org/apache/hadoop/mapred/gridmix/emulators/resourceusage/ResourceUsageEmulatorPlugin.java b/hadoop-tools/hadoop-gridmix/src/main/java/org/apache/hadoop/mapred/gridmix/emulators/resourceusage/ResourceUsageEmulatorPlugin.java index 5e5da56b79c..593c1a462bf 100644 --- a/hadoop-tools/hadoop-gridmix/src/main/java/org/apache/hadoop/mapred/gridmix/emulators/resourceusage/ResourceUsageEmulatorPlugin.java +++ b/hadoop-tools/hadoop-gridmix/src/main/java/org/apache/hadoop/mapred/gridmix/emulators/resourceusage/ResourceUsageEmulatorPlugin.java @@ -42,7 +42,7 @@ import org.apache.hadoop.conf.Configuration; * For configuring GridMix to load and and use a resource usage emulator, * see {@link ResourceUsageMatcher}. */ -public interface ResourceUsageEmulatorPlugin { +public interface ResourceUsageEmulatorPlugin extends Progressive { /** * Initialize the plugin. This might involve * - initializing the variables diff --git a/hadoop-tools/hadoop-gridmix/src/main/java/org/apache/hadoop/mapred/gridmix/emulators/resourceusage/ResourceUsageMatcher.java b/hadoop-tools/hadoop-gridmix/src/main/java/org/apache/hadoop/mapred/gridmix/emulators/resourceusage/ResourceUsageMatcher.java index 69a553f95e9..38095800111 100644 --- a/hadoop-tools/hadoop-gridmix/src/main/java/org/apache/hadoop/mapred/gridmix/emulators/resourceusage/ResourceUsageMatcher.java +++ b/hadoop-tools/hadoop-gridmix/src/main/java/org/apache/hadoop/mapred/gridmix/emulators/resourceusage/ResourceUsageMatcher.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.mapred.gridmix.emulators.resourceusage; +import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -35,7 +36,7 @@ import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin; *
Note that the order in which the emulators are invoked is same as the * order in which they are configured. */ -public class ResourceUsageMatcher { +public class ResourceUsageMatcher implements Progressive { /** * Configuration key to set resource usage emulators. */ @@ -80,10 +81,31 @@ public class ResourceUsageMatcher { } } - public void matchResourceUsage() throws Exception { + public void matchResourceUsage() throws IOException, InterruptedException { for (ResourceUsageEmulatorPlugin emulator : emulationPlugins) { // match the resource usage emulator.emulate(); } } + + /** + * Returns the average progress. + */ + @Override + public float getProgress() { + if (emulationPlugins.size() > 0) { + // return the average progress + float progress = 0f; + for (ResourceUsageEmulatorPlugin emulator : emulationPlugins) { + // consider weighted progress of each emulator + progress += emulator.getProgress(); + } + + return progress / emulationPlugins.size(); + } + + // if no emulators are configured then return 1 + return 1f; + + } } \ No newline at end of file diff --git a/hadoop-tools/hadoop-gridmix/src/main/java/org/apache/hadoop/mapred/gridmix/emulators/resourceusage/TotalHeapUsageEmulatorPlugin.java b/hadoop-tools/hadoop-gridmix/src/main/java/org/apache/hadoop/mapred/gridmix/emulators/resourceusage/TotalHeapUsageEmulatorPlugin.java index 5752269ef05..0ac8e7dc918 100644 --- a/hadoop-tools/hadoop-gridmix/src/main/java/org/apache/hadoop/mapred/gridmix/emulators/resourceusage/TotalHeapUsageEmulatorPlugin.java +++ b/hadoop-tools/hadoop-gridmix/src/main/java/org/apache/hadoop/mapred/gridmix/emulators/resourceusage/TotalHeapUsageEmulatorPlugin.java @@ -186,6 +186,11 @@ implements ResourceUsageEmulatorPlugin { return Runtime.getRuntime().maxMemory() / ONE_MB; } + @Override + public float getProgress() { + return Math.min(1f, ((float)getTotalHeapUsageInMB())/targetHeapUsageInMB); + } + @Override public void emulate() throws IOException, InterruptedException { if (enabled) { diff --git a/hadoop-tools/hadoop-gridmix/src/test/java/org/apache/hadoop/mapred/gridmix/TestResourceUsageEmulators.java b/hadoop-tools/hadoop-gridmix/src/test/java/org/apache/hadoop/mapred/gridmix/TestResourceUsageEmulators.java index fadb205c479..feafafaaa37 100644 --- a/hadoop-tools/hadoop-gridmix/src/test/java/org/apache/hadoop/mapred/gridmix/TestResourceUsageEmulators.java +++ b/hadoop-tools/hadoop-gridmix/src/test/java/org/apache/hadoop/mapred/gridmix/TestResourceUsageEmulators.java @@ -133,6 +133,14 @@ public class TestResourceUsageEmulators { ? fs.getFileStatus(testPath).getModificationTime() : 0; } + + @Override + public float getProgress() { + try { + return fs.exists(touchPath) ? 1.0f : 0f; + } catch (IOException ioe) {} + return 0f; + } } /**