MAPREDUCE-3008. Improvements to cumulative CPU emulation for short running tasks in Gridmix. (amarrk via tgraves)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1461269 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Thomas Graves 2013-03-26 19:00:53 +00:00
parent 340c65b9f0
commit e9440d688e
7 changed files with 165 additions and 30 deletions

View File

@ -93,6 +93,8 @@ Release 2.0.5-beta - UNRELEASED
MAPREDUCE-5077. Remove mapreduce.util.ResourceCalculatorPlugin and related MAPREDUCE-5077. Remove mapreduce.util.ResourceCalculatorPlugin and related
code. (Karthik Kambatla via sseth) code. (Karthik Kambatla via sseth)
MAPREDUCE-3008. Improvements to cumulative CPU emulation for short running
tasks in Gridmix. (amarrk via tgraves)
Release 2.0.4-alpha - UNRELEASED Release 2.0.4-alpha - UNRELEASED

View File

@ -32,6 +32,7 @@ import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskCounter;
import org.apache.hadoop.mapreduce.TaskInputOutputContext; import org.apache.hadoop.mapreduce.TaskInputOutputContext;
import org.apache.hadoop.mapreduce.TaskType; import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
@ -72,7 +73,7 @@ class LoadJob extends GridmixJob {
job.setNumReduceTasks(jobdesc.getNumberReduces()); job.setNumReduceTasks(jobdesc.getNumberReduces());
job.setMapOutputKeyClass(GridmixKey.class); job.setMapOutputKeyClass(GridmixKey.class);
job.setMapOutputValueClass(GridmixRecord.class); job.setMapOutputValueClass(GridmixRecord.class);
job.setSortComparatorClass(GridmixKey.Comparator.class); job.setSortComparatorClass(LoadSortComparator.class);
job.setGroupingComparatorClass(SpecGroupingComparator.class); job.setGroupingComparatorClass(SpecGroupingComparator.class);
job.setInputFormatClass(LoadInputFormat.class); job.setInputFormatClass(LoadInputFormat.class);
job.setOutputFormatClass(RawBytesOutputFormat.class); job.setOutputFormatClass(RawBytesOutputFormat.class);
@ -93,18 +94,85 @@ class LoadJob extends GridmixJob {
return true; return true;
} }
/**
* This is a load matching key comparator which will make sure that the
* resource usage load is matched even when the framework is in control.
*/
public static class LoadSortComparator extends GridmixKey.Comparator {
private ResourceUsageMatcherRunner matcher = null;
private boolean isConfigured = false;
public LoadSortComparator() {
super();
}
@Override
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
configure();
int ret = super.compare(b1, s1, l1, b2, s2, l2);
if (matcher != null) {
try {
matcher.match(); // match the resource usage now
} catch (Exception e) {}
}
return ret;
}
//TODO Note that the sorter will be instantiated 2 times as follows
// 1. During the sort/spill in the map phase
// 2. During the merge in the sort phase
// We need the handle to the matcher thread only in (2).
// This logic can be relaxed to run only in (2).
private void configure() {
if (!isConfigured) {
ThreadGroup group = Thread.currentThread().getThreadGroup();
Thread[] threads = new Thread[group.activeCount() * 2];
group.enumerate(threads, true);
for (Thread t : threads) {
if (t != null && (t instanceof ResourceUsageMatcherRunner)) {
this.matcher = (ResourceUsageMatcherRunner) t;
isConfigured = true;
break;
}
}
}
}
}
/** /**
* This is a progress based resource usage matcher. * This is a progress based resource usage matcher.
*/ */
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
static class ResourceUsageMatcherRunner extends Thread { static class ResourceUsageMatcherRunner extends Thread
implements Progressive {
private final ResourceUsageMatcher matcher; private final ResourceUsageMatcher matcher;
private final Progressive progress; private final BoostingProgress progress;
private final long sleepTime; private final long sleepTime;
private static final String SLEEP_CONFIG = private static final String SLEEP_CONFIG =
"gridmix.emulators.resource-usage.sleep-duration"; "gridmix.emulators.resource-usage.sleep-duration";
private static final long DEFAULT_SLEEP_TIME = 100; // 100ms private static final long DEFAULT_SLEEP_TIME = 100; // 100ms
/**
* This is a progress bar that can be boosted for weaker use-cases.
*/
private static class BoostingProgress implements Progressive {
private float boostValue = 0f;
TaskInputOutputContext context;
BoostingProgress(TaskInputOutputContext context) {
this.context = context;
}
void setBoostValue(float boostValue) {
this.boostValue = boostValue;
}
@Override
public float getProgress() {
return Math.min(1f, context.getProgress() + boostValue);
}
}
ResourceUsageMatcherRunner(final TaskInputOutputContext context, ResourceUsageMatcherRunner(final TaskInputOutputContext context,
ResourceUsageMetrics metrics) { ResourceUsageMetrics metrics) {
Configuration conf = context.getConfiguration(); Configuration conf = context.getConfiguration();
@ -118,19 +186,14 @@ class LoadJob extends GridmixJob {
// set the other parameters // set the other parameters
this.sleepTime = conf.getLong(SLEEP_CONFIG, DEFAULT_SLEEP_TIME); this.sleepTime = conf.getLong(SLEEP_CONFIG, DEFAULT_SLEEP_TIME);
progress = new Progressive() { progress = new BoostingProgress(context);
@Override
public float getProgress() {
return context.getProgress();
}
};
// instantiate a resource-usage-matcher // instantiate a resource-usage-matcher
matcher = new ResourceUsageMatcher(); matcher = new ResourceUsageMatcher();
matcher.configure(conf, plugin, metrics, progress); matcher.configure(conf, plugin, metrics, progress);
} }
protected void match() throws Exception { protected void match() throws IOException, InterruptedException {
// match the resource usage // match the resource usage
matcher.matchResourceUsage(); matcher.matchResourceUsage();
} }
@ -157,21 +220,34 @@ class LoadJob extends GridmixJob {
+ " thread! Exiting.", e); + " thread! Exiting.", e);
} }
} }
@Override
public float getProgress() {
return matcher.getProgress();
}
// boost the progress bar as fasten up the emulation cycles.
void boost(float value) {
progress.setBoostValue(value);
}
} }
// Makes sure that the TaskTracker doesn't kill the map/reduce tasks while // Makes sure that the TaskTracker doesn't kill the map/reduce tasks while
// they are emulating // they are emulating
private static class StatusReporter extends Thread { private static class StatusReporter extends Thread {
private TaskAttemptContext context; private final TaskAttemptContext context;
StatusReporter(TaskAttemptContext context) { private final Progressive progress;
StatusReporter(TaskAttemptContext context, Progressive progress) {
this.context = context; this.context = context;
this.progress = progress;
} }
@Override @Override
public void run() { public void run() {
LOG.info("Status reporter thread started."); LOG.info("Status reporter thread started.");
try { try {
while (context.getProgress() < 1) { while (!isInterrupted() && progress.getProgress() < 1) {
// report progress // report progress
context.progress(); context.progress();
@ -278,7 +354,7 @@ class LoadJob extends GridmixJob {
matcher.setDaemon(true); matcher.setDaemon(true);
// start the status reporter thread // start the status reporter thread
reporter = new StatusReporter(ctxt); reporter = new StatusReporter(ctxt, matcher);
reporter.setDaemon(true); reporter.setDaemon(true);
reporter.start(); reporter.start();
} }
@ -326,6 +402,17 @@ class LoadJob extends GridmixJob {
} }
} }
// check if the thread will get a chance to run or not
// check if there will be a sort&spill->merge phase or not
// check if the final sort&spill->merge phase is gonna happen or not
if (context.getNumReduceTasks() > 0
&& context.getCounter(TaskCounter.SPILLED_RECORDS).getValue() == 0) {
LOG.info("Boosting the map phase progress.");
// add the sort phase progress to the map phase and emulate
matcher.boost(0.33f);
matcher.match();
}
// start the matcher thread since the map phase ends here // start the matcher thread since the map phase ends here
matcher.start(); matcher.start();
} }
@ -394,7 +481,7 @@ class LoadJob extends GridmixJob {
matcher = new ResourceUsageMatcherRunner(context, metrics); matcher = new ResourceUsageMatcherRunner(context, metrics);
// start the status reporter thread // start the status reporter thread
reporter = new StatusReporter(context); reporter = new StatusReporter(context, matcher);
reporter.start(); reporter.start();
} }
@Override @Override
@ -530,9 +617,13 @@ class LoadJob extends GridmixJob {
specRecords[j] = info.getOutputRecords(); specRecords[j] = info.getOutputRecords();
metrics[j] = info.getResourceUsageMetrics(); metrics[j] = info.getResourceUsageMetrics();
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug(String.format("SPEC(%d) %d -> %d %d %d", id(), i, LOG.debug(String.format("SPEC(%d) %d -> %d %d %d %d %d %d %d", id(), i,
i + j * maps, info.getOutputRecords(), i + j * maps, info.getOutputRecords(),
info.getOutputBytes())); info.getOutputBytes(),
info.getResourceUsageMetrics().getCumulativeCpuUsage(),
info.getResourceUsageMetrics().getPhysicalMemoryUsage(),
info.getResourceUsageMetrics().getVirtualMemoryUsage(),
info.getResourceUsageMetrics().getHeapUsage()));
} }
} }
final TaskInfo info = jobdesc.getTaskInfo(TaskType.MAP, i); final TaskInfo info = jobdesc.getTaskInfo(TaskType.MAP, i);

View File

@ -67,7 +67,7 @@ implements ResourceUsageEmulatorPlugin {
private float emulationInterval; // emulation interval private float emulationInterval; // emulation interval
private long targetCpuUsage = 0; private long targetCpuUsage = 0;
private float lastSeenProgress = 0; private float lastSeenProgress = 0;
private long lastSeenCpuUsageCpuUsage = 0; private long lastSeenCpuUsage = 0;
// Configuration parameters // Configuration parameters
public static final String CPU_EMULATION_PROGRESS_INTERVAL = public static final String CPU_EMULATION_PROGRESS_INTERVAL =
@ -229,6 +229,16 @@ implements ResourceUsageEmulatorPlugin {
return progress * progress * progress * progress; return progress * progress * progress * progress;
} }
private synchronized long getCurrentCPUUsage() {
return monitor.getCumulativeCpuTime();
}
@Override
public float getProgress() {
return Math.min(1f, ((float)getCurrentCPUUsage())/targetCpuUsage);
}
@Override @Override
//TODO Multi-threading for speedup? //TODO Multi-threading for speedup?
public void emulate() throws IOException, InterruptedException { public void emulate() throws IOException, InterruptedException {
@ -249,10 +259,9 @@ implements ResourceUsageEmulatorPlugin {
// Note that (Cc-Cl)/(Pc-Pl) is termed as 'rate' in the following // Note that (Cc-Cl)/(Pc-Pl) is termed as 'rate' in the following
// section // section
long currentCpuUsage = long currentCpuUsage = getCurrentCPUUsage();
monitor.getCumulativeCpuTime();
// estimate the cpu usage rate // estimate the cpu usage rate
float rate = (currentCpuUsage - lastSeenCpuUsageCpuUsage) float rate = (currentCpuUsage - lastSeenCpuUsage)
/ (currentProgress - lastSeenProgress); / (currentProgress - lastSeenProgress);
long projectedUsage = long projectedUsage =
currentCpuUsage + (long)((1 - currentProgress) * rate); currentCpuUsage + (long)((1 - currentProgress) * rate);
@ -264,8 +273,7 @@ implements ResourceUsageEmulatorPlugin {
(long)(targetCpuUsage (long)(targetCpuUsage
* getWeightForProgressInterval(currentProgress)); * getWeightForProgressInterval(currentProgress));
while (monitor.getCumulativeCpuTime() while (getCurrentCPUUsage() < currentWeighedTarget) {
< currentWeighedTarget) {
emulatorCore.compute(); emulatorCore.compute();
// sleep for 100ms // sleep for 100ms
try { try {
@ -281,8 +289,7 @@ implements ResourceUsageEmulatorPlugin {
// set the last seen progress // set the last seen progress
lastSeenProgress = progress.getProgress(); lastSeenProgress = progress.getProgress();
// set the last seen usage // set the last seen usage
lastSeenCpuUsageCpuUsage = lastSeenCpuUsage = getCurrentCPUUsage();
monitor.getCumulativeCpuTime();
} }
} }
} }
@ -310,6 +317,6 @@ implements ResourceUsageEmulatorPlugin {
// initialize the states // initialize the states
lastSeenProgress = 0; lastSeenProgress = 0;
lastSeenCpuUsageCpuUsage = 0; lastSeenCpuUsage = 0;
} }
} }

View File

@ -42,7 +42,7 @@ import org.apache.hadoop.conf.Configuration;
* For configuring GridMix to load and and use a resource usage emulator, * For configuring GridMix to load and and use a resource usage emulator,
* see {@link ResourceUsageMatcher}. * see {@link ResourceUsageMatcher}.
*/ */
public interface ResourceUsageEmulatorPlugin { public interface ResourceUsageEmulatorPlugin extends Progressive {
/** /**
* Initialize the plugin. This might involve * Initialize the plugin. This might involve
* - initializing the variables * - initializing the variables

View File

@ -17,6 +17,7 @@
*/ */
package org.apache.hadoop.mapred.gridmix.emulators.resourceusage; package org.apache.hadoop.mapred.gridmix.emulators.resourceusage;
import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
@ -35,7 +36,7 @@ import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin;
* <p>Note that the order in which the emulators are invoked is same as the * <p>Note that the order in which the emulators are invoked is same as the
* order in which they are configured. * order in which they are configured.
*/ */
public class ResourceUsageMatcher { public class ResourceUsageMatcher implements Progressive {
/** /**
* Configuration key to set resource usage emulators. * Configuration key to set resource usage emulators.
*/ */
@ -80,10 +81,31 @@ public class ResourceUsageMatcher {
} }
} }
public void matchResourceUsage() throws Exception { public void matchResourceUsage() throws IOException, InterruptedException {
for (ResourceUsageEmulatorPlugin emulator : emulationPlugins) { for (ResourceUsageEmulatorPlugin emulator : emulationPlugins) {
// match the resource usage // match the resource usage
emulator.emulate(); emulator.emulate();
} }
} }
/**
* Returns the average progress.
*/
@Override
public float getProgress() {
if (emulationPlugins.size() > 0) {
// return the average progress
float progress = 0f;
for (ResourceUsageEmulatorPlugin emulator : emulationPlugins) {
// consider weighted progress of each emulator
progress += emulator.getProgress();
}
return progress / emulationPlugins.size();
}
// if no emulators are configured then return 1
return 1f;
}
} }

View File

@ -186,6 +186,11 @@ implements ResourceUsageEmulatorPlugin {
return Runtime.getRuntime().maxMemory() / ONE_MB; return Runtime.getRuntime().maxMemory() / ONE_MB;
} }
@Override
public float getProgress() {
return Math.min(1f, ((float)getTotalHeapUsageInMB())/targetHeapUsageInMB);
}
@Override @Override
public void emulate() throws IOException, InterruptedException { public void emulate() throws IOException, InterruptedException {
if (enabled) { if (enabled) {

View File

@ -133,6 +133,14 @@ public class TestResourceUsageEmulators {
? fs.getFileStatus(testPath).getModificationTime() ? fs.getFileStatus(testPath).getModificationTime()
: 0; : 0;
} }
@Override
public float getProgress() {
try {
return fs.exists(touchPath) ? 1.0f : 0f;
} catch (IOException ioe) {}
return 0f;
}
} }
/** /**