MAPREDUCE-2106. [Gridmix] Cumulative CPU usage emulation in Gridmix. (amarrk)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1135396 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Amar Kamat 2011-06-14 07:44:16 +00:00
parent 667c400245
commit 3fd40ae8d0
12 changed files with 1388 additions and 12 deletions

View File

@ -11,6 +11,9 @@ Trunk (unreleased changes)
NEW FEATURES
MAPREDUCE-2106. [Gridmix] Cumulative CPU usage emulation in Gridmix.
(amarrk)
MAPREDUCE-2543. [Gridmix] High-Ram feature emulation in Gridmix. (amarrk)
MAPREDUCE-2408. [Gridmix] Compression emulation in Gridmix. (amarrk)

View File

@ -25,6 +25,7 @@
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.tools.rumen.ResourceUsageMetrics;
class GridmixKey extends GridmixRecord {
static final byte REDUCE_SPEC = 0;
@ -115,6 +116,22 @@ public void setReduceOutputBytes(long b_out) {
setSize(origSize);
}
/**
* Get the {@link ResourceUsageMetrics} stored in the key.
*/
public ResourceUsageMetrics getReduceResourceUsageMetrics() {
assert REDUCE_SPEC == getType();
return spec.metrics;
}
/**
* Store the {@link ResourceUsageMetrics} in the key.
*/
public void setReduceResourceUsageMetrics(ResourceUsageMetrics metrics) {
assert REDUCE_SPEC == getType();
spec.setResourceUsageSpecification(metrics);
}
public byte getType() {
return type;
}
@ -195,18 +212,35 @@ public static class Spec implements Writable {
long rec_in;
long rec_out;
long bytes_out;
private ResourceUsageMetrics metrics = null;
private int sizeOfResourceUsageMetrics = 0;
public Spec() { }
public void set(Spec other) {
rec_in = other.rec_in;
bytes_out = other.bytes_out;
rec_out = other.rec_out;
setResourceUsageSpecification(other.metrics);
}
/**
* Sets the {@link ResourceUsageMetrics} for this {@link Spec}.
*/
public void setResourceUsageSpecification(ResourceUsageMetrics metrics) {
this.metrics = metrics;
if (metrics != null) {
this.sizeOfResourceUsageMetrics = metrics.size();
} else {
this.sizeOfResourceUsageMetrics = 0;
}
}
public int getSize() {
return WritableUtils.getVIntSize(rec_in) +
WritableUtils.getVIntSize(rec_out) +
WritableUtils.getVIntSize(bytes_out);
WritableUtils.getVIntSize(bytes_out) +
WritableUtils.getVIntSize(sizeOfResourceUsageMetrics) +
sizeOfResourceUsageMetrics;
}
@Override
@ -214,6 +248,11 @@ public void readFields(DataInput in) throws IOException {
rec_in = WritableUtils.readVLong(in);
rec_out = WritableUtils.readVLong(in);
bytes_out = WritableUtils.readVLong(in);
sizeOfResourceUsageMetrics = WritableUtils.readVInt(in);
if (sizeOfResourceUsageMetrics > 0) {
metrics = new ResourceUsageMetrics();
metrics.readFields(in);
}
}
@Override
@ -221,6 +260,10 @@ public void write(DataOutput out) throws IOException {
WritableUtils.writeVLong(out, rec_in);
WritableUtils.writeVLong(out, rec_out);
WritableUtils.writeVLong(out, bytes_out);
WritableUtils.writeVInt(out, sizeOfResourceUsageMetrics);
if (sizeOfResourceUsageMetrics > 0) {
metrics.write(out);
}
}
}

View File

@ -28,6 +28,7 @@
import org.apache.hadoop.tools.rumen.JobStory;
import org.apache.hadoop.tools.rumen.JobStoryProducer;
import org.apache.hadoop.tools.rumen.Pre21JobHistoryConstants.Values;
import org.apache.hadoop.tools.rumen.ResourceUsageMetrics;
import org.apache.hadoop.tools.rumen.TaskAttemptInfo;
import org.apache.hadoop.tools.rumen.TaskInfo;
import org.apache.hadoop.tools.rumen.ZombieJobProducer;
@ -111,7 +112,7 @@ static class MinTaskInfo extends TaskInfo {
public MinTaskInfo(TaskInfo info) {
super(info.getInputBytes(), info.getInputRecords(),
info.getOutputBytes(), info.getOutputRecords(),
info.getTaskMemory());
info.getTaskMemory(), info.getResourceUsageMetrics());
}
public long getInputBytes() {
return Math.max(0, super.getInputBytes());

View File

@ -22,6 +22,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.gridmix.emulators.resourceusage.ResourceUsageMatcher;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
@ -31,10 +32,14 @@
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskInputOutputContext;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
import org.apache.hadoop.mapreduce.util.ResourceCalculatorPlugin;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.tools.rumen.JobStory;
import org.apache.hadoop.tools.rumen.ResourceUsageMetrics;
import org.apache.hadoop.tools.rumen.TaskInfo;
import java.io.IOException;
@ -88,6 +93,101 @@ protected boolean canEmulateCompression() {
return true;
}
/**
* This is a progress based resource usage matcher.
*/
@SuppressWarnings("unchecked")
static class ResourceUsageMatcherRunner extends Thread {
private final ResourceUsageMatcher matcher;
private final Progressive progress;
private final long sleepTime;
private static final String SLEEP_CONFIG =
"gridmix.emulators.resource-usage.sleep-duration";
private static final long DEFAULT_SLEEP_TIME = 100; // 100ms
ResourceUsageMatcherRunner(final TaskInputOutputContext context,
ResourceUsageMetrics metrics) {
Configuration conf = context.getConfiguration();
// set the resource calculator plugin
Class<? extends ResourceCalculatorPlugin> clazz =
conf.getClass(TTConfig.TT_RESOURCE_CALCULATOR_PLUGIN,
null, ResourceCalculatorPlugin.class);
ResourceCalculatorPlugin plugin =
ResourceCalculatorPlugin.getResourceCalculatorPlugin(clazz, conf);
// set the other parameters
this.sleepTime = conf.getLong(SLEEP_CONFIG, DEFAULT_SLEEP_TIME);
progress = new Progressive() {
@Override
public float getProgress() {
return context.getProgress();
}
};
// instantiate a resource-usage-matcher
matcher = new ResourceUsageMatcher();
matcher.configure(conf, plugin, metrics, progress);
}
protected void match() throws Exception {
// match the resource usage
matcher.matchResourceUsage();
}
@Override
public void run() {
LOG.info("Resource usage matcher thread started.");
try {
while (progress.getProgress() < 1) {
// match
match();
// sleep for some time
try {
Thread.sleep(sleepTime);
} catch (Exception e) {}
}
// match for progress = 1
match();
LOG.info("Resource usage emulation complete! Matcher exiting");
} catch (Exception e) {
LOG.info("Exception while running the resource-usage-emulation matcher"
+ " thread! Exiting.", e);
}
}
}
// Makes sure that the TaskTracker doesn't kill the map/reduce tasks while
// they are emulating
private static class StatusReporter extends Thread {
private TaskAttemptContext context;
StatusReporter(TaskAttemptContext context) {
this.context = context;
}
@Override
public void run() {
LOG.info("Status reporter thread started.");
try {
while (context.getProgress() < 1) {
// report progress
context.progress();
// sleep for some time
try {
Thread.sleep(100); // sleep for 100ms
} catch (Exception e) {}
}
LOG.info("Status reporter thread exiting");
} catch (Exception e) {
LOG.info("Exception while running the status reporter thread!", e);
}
}
}
public static class LoadMapper
extends Mapper<NullWritable, GridmixRecord, GridmixKey, GridmixRecord> {
@ -100,6 +200,9 @@ public static class LoadMapper
private final GridmixKey key = new GridmixKey();
private final GridmixRecord val = new GridmixRecord();
private ResourceUsageMatcherRunner matcher = null;
private StatusReporter reporter = null;
@Override
protected void setup(Context ctxt)
throws IOException, InterruptedException {
@ -133,6 +236,8 @@ protected void setup(Context ctxt)
if (i == id) {
spec.bytes_out = split.getReduceBytes(idx);
spec.rec_out = split.getReduceRecords(idx);
spec.setResourceUsageSpecification(
split.getReduceResourceUsageMetrics(idx));
++idx;
id += maps;
}
@ -167,6 +272,13 @@ protected void setup(Context ctxt)
: splitRecords;
ratio = totalRecords / (1.0 * inputRecords);
acc = 0.0;
matcher = new ResourceUsageMatcherRunner(ctxt,
split.getMapResourceUsageMetrics());
// start the status reporter thread
reporter = new StatusReporter(ctxt);
reporter.start();
}
@Override
@ -184,6 +296,13 @@ public void map(NullWritable ignored, GridmixRecord rec,
}
context.write(key, val);
acc -= 1.0;
// match inline
try {
matcher.match();
} catch (Exception e) {
LOG.debug("Error in resource usage emulation! Message: ", e);
}
}
}
@ -195,8 +314,18 @@ public void cleanup(Context context)
while (factory.next(key, val)) {
context.write(key, val);
key.setSeed(r.nextLong());
// match inline
try {
matcher.match();
} catch (Exception e) {
LOG.debug("Error in resource usage emulation! Message: ", e);
}
}
}
// start the matcher thread since the map phase ends here
matcher.start();
}
}
@ -210,6 +339,9 @@ public static class LoadReducer
private double ratio;
private RecordFactory factory;
private ResourceUsageMatcherRunner matcher = null;
private StatusReporter reporter = null;
@Override
protected void setup(Context context)
throws IOException, InterruptedException {
@ -220,11 +352,15 @@ protected void setup(Context context)
long outBytes = 0L;
long outRecords = 0L;
long inRecords = 0L;
ResourceUsageMetrics metrics = new ResourceUsageMetrics();
for (GridmixRecord ignored : context.getValues()) {
final GridmixKey spec = context.getCurrentKey();
inRecords += spec.getReduceInputRecords();
outBytes += spec.getReduceOutputBytes();
outRecords += spec.getReduceOutputRecords();
if (spec.getReduceResourceUsageMetrics() != null) {
metrics = spec.getReduceResourceUsageMetrics();
}
}
if (0 == outRecords && inRecords > 0) {
LOG.info("Spec output bytes w/o records. Using input record count");
@ -252,6 +388,12 @@ protected void setup(Context context)
context.getConfiguration(), 5*1024);
ratio = outRecords / (1.0 * inRecords);
acc = 0.0;
matcher = new ResourceUsageMatcherRunner(context, metrics);
// start the status reporter thread
reporter = new StatusReporter(context);
reporter.start();
}
@Override
protected void reduce(GridmixKey key, Iterable<GridmixRecord> values,
@ -262,6 +404,13 @@ protected void reduce(GridmixKey key, Iterable<GridmixRecord> values,
while (acc >= 1.0 && factory.next(null, val)) {
context.write(NullWritable.get(), val);
acc -= 1.0;
// match inline
try {
matcher.match();
} catch (Exception e) {
LOG.debug("Error in resource usage emulation! Message: ", e);
}
}
}
}
@ -272,6 +421,13 @@ protected void cleanup(Context context)
while (factory.next(null, val)) {
context.write(NullWritable.get(), val);
val.setSeed(r.nextLong());
// match inline
try {
matcher.match();
} catch (Exception e) {
LOG.debug("Error in resource usage emulation! Message: ", e);
}
}
}
}
@ -364,11 +520,13 @@ void buildSplits(FilePool inputDir) throws IOException {
final int nSpec = reds / maps + ((reds % maps) > i ? 1 : 0);
final long[] specBytes = new long[nSpec];
final long[] specRecords = new long[nSpec];
final ResourceUsageMetrics[] metrics = new ResourceUsageMetrics[nSpec];
for (int j = 0; j < nSpec; ++j) {
final TaskInfo info =
jobdesc.getTaskInfo(TaskType.REDUCE, i + j * maps);
specBytes[j] = info.getOutputBytes();
specRecords[j] = info.getOutputRecords();
metrics[j] = info.getResourceUsageMetrics();
if (LOG.isDebugEnabled()) {
LOG.debug(String.format("SPEC(%d) %d -> %d %d %d", id(), i,
i + j * maps, info.getOutputRecords(),
@ -381,7 +539,8 @@ void buildSplits(FilePool inputDir) throws IOException {
maps, i, info.getInputBytes(), info.getInputRecords(),
info.getOutputBytes(), info.getOutputRecords(),
reduceByteRatio, reduceRecordRatio, specBytes,
specRecords));
specRecords, info.getResourceUsageMetrics(),
metrics));
}
pushDescription(id(), splits);
}

View File

@ -23,6 +23,7 @@
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
import org.apache.hadoop.tools.rumen.ResourceUsageMetrics;
class LoadSplit extends CombineFileSplit {
private int id;
@ -40,6 +41,9 @@ class LoadSplit extends CombineFileSplit {
private long[] reduceOutputBytes = new long[0];
private long[] reduceOutputRecords = new long[0];
private ResourceUsageMetrics mapMetrics;
private ResourceUsageMetrics[] reduceMetrics;
LoadSplit() {
super();
}
@ -47,7 +51,9 @@ class LoadSplit extends CombineFileSplit {
public LoadSplit(CombineFileSplit cfsplit, int maps, int id, long inputBytes,
long inputRecords, long outputBytes, long outputRecords,
double[] reduceBytes, double[] reduceRecords,
long[] reduceOutputBytes, long[] reduceOutputRecords)
long[] reduceOutputBytes, long[] reduceOutputRecords,
ResourceUsageMetrics metrics,
ResourceUsageMetrics[] rMetrics)
throws IOException {
super(cfsplit);
this.id = id;
@ -61,6 +67,8 @@ public LoadSplit(CombineFileSplit cfsplit, int maps, int id, long inputBytes,
nSpec = reduceOutputBytes.length;
this.reduceOutputBytes = reduceOutputBytes;
this.reduceOutputRecords = reduceOutputRecords;
this.mapMetrics = metrics;
this.reduceMetrics = rMetrics;
}
public int getId() {
@ -98,6 +106,15 @@ public long getReduceBytes(int i) {
public long getReduceRecords(int i) {
return reduceOutputRecords[i];
}
public ResourceUsageMetrics getMapResourceUsageMetrics() {
return mapMetrics;
}
public ResourceUsageMetrics getReduceResourceUsageMetrics(int i) {
return reduceMetrics[i];
}
@Override
public void write(DataOutput out) throws IOException {
super.write(out);
@ -117,6 +134,12 @@ public void write(DataOutput out) throws IOException {
WritableUtils.writeVLong(out, reduceOutputBytes[i]);
WritableUtils.writeVLong(out, reduceOutputRecords[i]);
}
mapMetrics.write(out);
int numReduceMetrics = (reduceMetrics == null) ? 0 : reduceMetrics.length;
WritableUtils.writeVInt(out, numReduceMetrics);
for (int i = 0; i < numReduceMetrics; ++i) {
reduceMetrics[i].write(out);
}
}
@Override
public void readFields(DataInput in) throws IOException {
@ -145,5 +168,13 @@ public void readFields(DataInput in) throws IOException {
reduceOutputBytes[i] = WritableUtils.readVLong(in);
reduceOutputRecords[i] = WritableUtils.readVLong(in);
}
mapMetrics = new ResourceUsageMetrics();
mapMetrics.readFields(in);
int numReduceMetrics = WritableUtils.readVInt(in);
reduceMetrics = new ResourceUsageMetrics[numReduceMetrics];
for (int i = 0; i < numReduceMetrics; ++i) {
reduceMetrics[i] = new ResourceUsageMetrics();
reduceMetrics[i].readFields(in);
}
}
}

View File

@ -0,0 +1,25 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred.gridmix;
/**
* Used to track progress of tasks.
*/
public interface Progressive {
public float getProgress();
}

View File

@ -0,0 +1,315 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred.gridmix.emulators.resourceusage;
import java.io.IOException;
import java.util.Random;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.gridmix.Progressive;
import org.apache.hadoop.mapreduce.util.ResourceCalculatorPlugin;
import org.apache.hadoop.tools.rumen.ResourceUsageMetrics;
/**
* <p>A {@link ResourceUsageEmulatorPlugin} that emulates the cumulative CPU
* usage by performing certain CPU intensive operations. Performing such CPU
* intensive operations essentially uses up some CPU. Every
* {@link ResourceUsageEmulatorPlugin} is configured with a feedback module i.e
* a {@link ResourceCalculatorPlugin}, to monitor the resource usage.</p>
*
* <p>{@link CumulativeCpuUsageEmulatorPlugin} emulates the CPU usage in steps.
* The frequency of emulation can be configured via
* {@link #CPU_EMULATION_FREQUENCY}.
* CPU usage values are matched via emulation only on the interval boundaries.
* </p>
*
* {@link CumulativeCpuUsageEmulatorPlugin} is a wrapper program for managing
* the CPU usage emulation feature. It internally uses an emulation algorithm
* (called as core and described using {@link CpuUsageEmulatorCore}) for
* performing the actual emulation. Multiple calls to this core engine should
* use up some amount of CPU.<br>
*
* <p>{@link CumulativeCpuUsageEmulatorPlugin} provides a calibration feature
* via {@link #initialize(Configuration, ResourceUsageMetrics,
* ResourceCalculatorPlugin, Progressive)} to calibrate
* the plugin and its core for the underlying hardware. As a result of
* calibration, every call to the emulation engine's core should roughly use up
* 1% of the total usage value to be emulated. This makes sure that the
* underlying hardware is profiled before use and that the plugin doesn't
* accidently overuse the CPU. With 1% as the unit emulation target value for
* the core engine, there will be roughly 100 calls to the engine resulting in
* roughly 100 calls to the feedback (resource usage monitor) module.
* Excessive usage of the feedback module is discouraged as
* it might result into excess CPU usage resulting into no real CPU emulation.
* </p>
*/
public class CumulativeCpuUsageEmulatorPlugin
implements ResourceUsageEmulatorPlugin {
protected CpuUsageEmulatorCore emulatorCore;
private ResourceCalculatorPlugin monitor;
private Progressive progress;
private boolean enabled = true;
private float emulationInterval; // emulation interval
private long targetCpuUsage = 0;
private float lastSeenProgress = 0;
private long lastSeenCpuUsageCpuUsage = 0;
// Configuration parameters
public static final String CPU_EMULATION_FREQUENCY =
"gridmix.emulators.resource-usage.cpu.frequency";
private static final float DEFAULT_EMULATION_FREQUENCY = 0.1F; // 10 times
/**
* This is the core CPU usage emulation algorithm. This is the core engine
* which actually performs some CPU intensive operations to consume some
* amount of CPU. Multiple calls of {@link #compute()} should help the
* plugin emulate the desired level of CPU usage. This core engine can be
* calibrated using the {@link #calibrate(ResourceCalculatorPlugin, long)}
* API to suit the underlying hardware better. It also can be used to optimize
* the emulation cycle.
*/
public interface CpuUsageEmulatorCore {
/**
* Performs some computation to use up some CPU.
*/
public void compute();
/**
* Allows the core to calibrate itself.
*/
public void calibrate(ResourceCalculatorPlugin monitor,
long totalCpuUsage);
}
/**
* This is the core engine to emulate the CPU usage. The only responsibility
* of this class is to perform certain math intensive operations to make sure
* that some desired value of CPU is used.
*/
public static class DefaultCpuUsageEmulator implements CpuUsageEmulatorCore {
// number of times to loop for performing the basic unit computation
private int numIterations;
private final Random random;
/**
* This is to fool the JVM and make it think that we need the value
* stored in the unit computation i.e {@link #compute()}. This will prevent
* the JVM from optimizing the code.
*/
protected double returnValue;
/**
* Initialized the {@link DefaultCpuUsageEmulator} with default values.
* Note that the {@link DefaultCpuUsageEmulator} should be calibrated
* (see {@link #calibrate(ResourceCalculatorPlugin, long)}) when initialized
* using this constructor.
*/
public DefaultCpuUsageEmulator() {
this(-1);
}
DefaultCpuUsageEmulator(int numIterations) {
this.numIterations = numIterations;
random = new Random();
}
/**
* This will consume some desired level of CPU. This API will try to use up
* 'X' percent of the target cumulative CPU usage. Currently X is set to
* 10%.
*/
public void compute() {
for (int i = 0; i < numIterations; ++i) {
performUnitComputation();
}
}
// Perform unit computation. The complete CPU emulation will be based on
// multiple invocations to this unit computation module.
protected void performUnitComputation() {
//TODO can this be configurable too. Users/emulators should be able to
// pick and choose what MATH operations to run.
// Example :
// BASIC : ADD, SUB, MUL, DIV
// ADV : SQRT, SIN, COSIN..
// COMPO : (BASIC/ADV)*
// Also define input generator. For now we can use the random number
// generator. Later this can be changed to accept multiple sources.
int randomData = random.nextInt();
int randomDataCube = randomData * randomData * randomData;
double randomDataCubeRoot = Math.cbrt(randomData);
returnValue = Math.log(Math.tan(randomDataCubeRoot
* Math.exp(randomDataCube))
* Math.sqrt(randomData));
}
/**
* This will calibrate the algorithm such that a single invocation of
* {@link #compute()} emulates roughly 1% of the total desired resource
* usage value.
*/
public void calibrate(ResourceCalculatorPlugin monitor,
long totalCpuUsage) {
long initTime = monitor.getProcResourceValues().getCumulativeCpuTime();
long defaultLoopSize = 0;
long finalTime = initTime;
//TODO Make this configurable
while (finalTime - initTime < 100) { // 100 ms
++defaultLoopSize;
performUnitComputation(); //perform unit computation
finalTime = monitor.getProcResourceValues().getCumulativeCpuTime();
}
long referenceRuntime = finalTime - initTime;
// time for one loop = (final-time - init-time) / total-loops
float timePerLoop = ((float)referenceRuntime) / defaultLoopSize;
// compute the 1% of the total CPU usage desired
//TODO Make this configurable
long onePercent = totalCpuUsage / 100;
// num-iterations for 1% = (total-desired-usage / 100) / time-for-one-loop
numIterations = Math.max(1, (int)((float)onePercent/timePerLoop));
System.out.println("Calibration done. Basic computation runtime : "
+ timePerLoop + " milliseconds. Optimal number of iterations (1%): "
+ numIterations);
}
}
public CumulativeCpuUsageEmulatorPlugin() {
this(new DefaultCpuUsageEmulator());
}
/**
* For testing.
*/
public CumulativeCpuUsageEmulatorPlugin(CpuUsageEmulatorCore core) {
emulatorCore = core;
}
// Note that this weighing function uses only the current progress. In future,
// this might depend on progress, emulation-interval and expected target.
private float getWeightForProgressInterval(float progress) {
// we want some kind of exponential growth function that gives less weight
// on lower progress boundaries but high (exact emulation) near progress
// value of 1.
// so here is how the current growth function looks like
// progress weight
// 0.1 0.0001
// 0.2 0.0016
// 0.3 0.0081
// 0.4 0.0256
// 0.5 0.0625
// 0.6 0.1296
// 0.7 0.2401
// 0.8 0.4096
// 0.9 0.6561
// 1.0 1.000
return progress * progress * progress * progress;
}
@Override
//TODO Multi-threading for speedup?
public void emulate() throws IOException, InterruptedException {
if (enabled) {
float currentProgress = progress.getProgress();
if (lastSeenProgress < currentProgress
&& ((currentProgress - lastSeenProgress) >= emulationInterval
|| currentProgress == 1)) {
// Estimate the final cpu usage
//
// Consider the following
// Cl/Cc/Cp : Last/Current/Projected Cpu usage
// Pl/Pc/Pp : Last/Current/Projected progress
// Then
// (Cp-Cc)/(Pp-Pc) = (Cc-Cl)/(Pc-Pl)
// Solving this for Cp, we get
// Cp = Cc + (1-Pc)*(Cc-Cl)/Pc-Pl)
// Note that (Cc-Cl)/(Pc-Pl) is termed as 'rate' in the following
// section
long currentCpuUsage =
monitor.getProcResourceValues().getCumulativeCpuTime();
// estimate the cpu usage rate
float rate = (currentCpuUsage - lastSeenCpuUsageCpuUsage)
/ (currentProgress - lastSeenProgress);
long projectedUsage =
currentCpuUsage + (long)((1 - currentProgress) * rate);
if (projectedUsage < targetCpuUsage) {
// determine the correction factor between the current usage and the
// expected usage and add some weight to the target
long currentWeighedTarget =
(long)(targetCpuUsage
* getWeightForProgressInterval(currentProgress));
while (monitor.getProcResourceValues().getCumulativeCpuTime()
< currentWeighedTarget) {
emulatorCore.compute();
// sleep for 100ms
try {
Thread.sleep(100);
} catch (InterruptedException ie) {
String message =
"CumulativeCpuUsageEmulatorPlugin got interrupted. Exiting.";
throw new RuntimeException(message);
}
}
}
// set the last seen progress
lastSeenProgress = progress.getProgress();
// set the last seen usage
lastSeenCpuUsageCpuUsage =
monitor.getProcResourceValues().getCumulativeCpuTime();
}
}
}
@Override
public void initialize(Configuration conf, ResourceUsageMetrics metrics,
ResourceCalculatorPlugin monitor,
Progressive progress) {
// get the target CPU usage
targetCpuUsage = metrics.getCumulativeCpuUsage();
if (targetCpuUsage <= 0 ) {
enabled = false;
return;
} else {
enabled = true;
}
this.monitor = monitor;
this.progress = progress;
emulationInterval = conf.getFloat(CPU_EMULATION_FREQUENCY,
DEFAULT_EMULATION_FREQUENCY);
// calibrate the core cpu-usage utility
emulatorCore.calibrate(monitor, targetCpuUsage);
// initialize the states
lastSeenProgress = 0;
lastSeenCpuUsageCpuUsage = 0;
}
}

View File

@ -0,0 +1,63 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred.gridmix.emulators.resourceusage;
import java.io.IOException;
import org.apache.hadoop.mapred.gridmix.Progressive;
import org.apache.hadoop.mapreduce.util.ResourceCalculatorPlugin;
import org.apache.hadoop.tools.rumen.ResourceUsageMetrics;
import org.apache.hadoop.conf.Configuration;
/**
* <p>Each resource to be emulated should have a corresponding implementation
* class that implements {@link ResourceUsageEmulatorPlugin}.</p>
* <br><br>
* {@link ResourceUsageEmulatorPlugin} will be configured using the
* {@link #initialize(Configuration, ResourceUsageMetrics,
* ResourceCalculatorPlugin, Progressive)} call.
* Every
* {@link ResourceUsageEmulatorPlugin} is also configured with a feedback module
* i.e a {@link ResourceCalculatorPlugin}, to monitor the current resource
* usage. {@link ResourceUsageMetrics} decides the final resource usage value to
* emulate. {@link Progressive} keeps track of the task's progress.</p>
*
* <br><br>
*
* For configuring GridMix to load and and use a resource usage emulator,
* see {@link ResourceUsageMatcher}.
*/
public interface ResourceUsageEmulatorPlugin {
/**
* Initialize the plugin. This might involve
* - initializing the variables
* - calibrating the plugin
*/
void initialize(Configuration conf, ResourceUsageMetrics metrics,
ResourceCalculatorPlugin monitor,
Progressive progress);
/**
* Emulate the resource usage to match the usage target. The plugin can use
* the given {@link ResourceCalculatorPlugin} to query for the current
* resource usage.
* @throws IOException
* @throws InterruptedException
*/
void emulate() throws IOException, InterruptedException;
}

View File

@ -0,0 +1,81 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred.gridmix.emulators.resourceusage;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.gridmix.Progressive;
import org.apache.hadoop.mapreduce.util.ResourceCalculatorPlugin;
import org.apache.hadoop.tools.rumen.ResourceUsageMetrics;
import org.apache.hadoop.util.ReflectionUtils;
/**
* <p>This is the driver class for managing all the resource usage emulators.
* {@link ResourceUsageMatcher} expects a comma separated list of
* {@link ResourceUsageEmulatorPlugin} implementations specified using
* {@link #RESOURCE_USAGE_EMULATION_PLUGINS} as the configuration parameter.</p>
*
* <p>Note that the order in which the emulators are invoked is same as the
* order in which they are configured.
*/
public class ResourceUsageMatcher {
/**
* Configuration key to set resource usage emulators.
*/
public static final String RESOURCE_USAGE_EMULATION_PLUGINS =
"gridmix.emulators.resource-usage.plugins";
private List<ResourceUsageEmulatorPlugin> emulationPlugins =
new ArrayList<ResourceUsageEmulatorPlugin>();
/**
* Configure the {@link ResourceUsageMatcher} to load the configured plugins
* and initialize them.
*/
@SuppressWarnings("unchecked")
public void configure(Configuration conf, ResourceCalculatorPlugin monitor,
ResourceUsageMetrics metrics, Progressive progress) {
Class[] plugins =
conf.getClasses(RESOURCE_USAGE_EMULATION_PLUGINS,
ResourceUsageEmulatorPlugin.class);
if (plugins == null) {
System.out.println("No resource usage emulator plugins configured.");
} else {
for (Class<? extends ResourceUsageEmulatorPlugin> plugin : plugins) {
if (plugin != null) {
emulationPlugins.add(ReflectionUtils.newInstance(plugin, conf));
}
}
}
// initialize the emulators once all the configured emulator plugins are
// loaded
for (ResourceUsageEmulatorPlugin emulator : emulationPlugins) {
emulator.initialize(conf, metrics, monitor, progress);
}
}
public void matchResourceUsage() throws Exception {
for (ResourceUsageEmulatorPlugin emulator : emulationPlugins) {
// match the resource usage
emulator.emulate();
}
}
}

View File

@ -176,7 +176,8 @@ static void checkSpec(GridmixKey a, GridmixKey b) throws Exception {
a.setReduceOutputBytes(out_bytes);
final int min = WritableUtils.getVIntSize(in_rec)
+ WritableUtils.getVIntSize(out_rec)
+ WritableUtils.getVIntSize(out_bytes);
+ WritableUtils.getVIntSize(out_bytes)
+ WritableUtils.getVIntSize(0);
assertEquals(min + 2, a.fixedBytes()); // meta + vint min
final int size = r.nextInt(1024) + a.fixedBytes() + 1;
setSerialize(a, r.nextLong(), size, out);
@ -207,7 +208,7 @@ static void setSerialize(GridmixRecord x, long seed, int size,
@Test
public void testKeySpec() throws Exception {
final int min = 5;
final int min = 6;
final int max = 300;
final GridmixKey a = new GridmixKey(GridmixKey.REDUCE_SPEC, 1, 0L);
final GridmixKey b = new GridmixKey(GridmixKey.REDUCE_SPEC, 1, 0L);

View File

@ -0,0 +1,613 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred.gridmix;
import java.io.IOException;
import org.junit.Test;
import static org.junit.Assert.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.StatusReporter;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskInputOutputContext;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
import org.apache.hadoop.mapreduce.task.MapContextImpl;
import org.apache.hadoop.mapreduce.util.ResourceCalculatorPlugin;
import org.apache.hadoop.mapreduce.util.ResourceCalculatorPlugin.ProcResourceValues;
import org.apache.hadoop.tools.rumen.ResourceUsageMetrics;
import org.apache.hadoop.mapred.DummyResourceCalculatorPlugin;
import org.apache.hadoop.mapred.gridmix.LoadJob.ResourceUsageMatcherRunner;
import org.apache.hadoop.mapred.gridmix.emulators.resourceusage.CumulativeCpuUsageEmulatorPlugin;
import org.apache.hadoop.mapred.gridmix.emulators.resourceusage.ResourceUsageEmulatorPlugin;
import org.apache.hadoop.mapred.gridmix.emulators.resourceusage.ResourceUsageMatcher;
import org.apache.hadoop.mapred.gridmix.emulators.resourceusage.CumulativeCpuUsageEmulatorPlugin.DefaultCpuUsageEmulator;
/**
* Test Gridmix's resource emulator framework and supported plugins.
*/
public class TestResourceUsageEmulators {
/**
* A {@link ResourceUsageEmulatorPlugin} implementation for testing purpose.
* It essentially creates a file named 'test' in the test directory.
*/
static class TestResourceUsageEmulatorPlugin
implements ResourceUsageEmulatorPlugin {
static final Path rootTempDir =
new Path(System.getProperty("test.build.data", "/tmp"));
static final Path tempDir =
new Path(rootTempDir, "TestResourceUsageEmulatorPlugin");
static final String DEFAULT_IDENTIFIER = "test";
private Path touchPath = null;
private FileSystem fs = null;
@Override
public void emulate() throws IOException, InterruptedException {
// add some time between 2 calls to emulate()
try {
Thread.sleep(1000); // sleep for 1s
} catch (Exception e){}
try {
fs.delete(touchPath, false); // delete the touch file
//TODO Search for a better touch utility
fs.create(touchPath).close(); // recreate it
} catch (Exception e) {
throw new RuntimeException(e);
}
}
protected String getIdentifier() {
return DEFAULT_IDENTIFIER;
}
private static Path getFilePath(String id) {
return new Path(tempDir, id);
}
private static Path getInitFilePath(String id) {
return new Path(tempDir, id + ".init");
}
@Override
public void initialize(Configuration conf, ResourceUsageMetrics metrics,
ResourceCalculatorPlugin monitor, Progressive progress) {
// add some time between 2 calls to initialize()
try {
Thread.sleep(1000); // sleep for 1s
} catch (Exception e){}
try {
fs = FileSystem.getLocal(conf);
Path initPath = getInitFilePath(getIdentifier());
fs.delete(initPath, false); // delete the old file
fs.create(initPath).close(); // create a new one
touchPath = getFilePath(getIdentifier());
fs.delete(touchPath, false);
} catch (Exception e) {
} finally {
if (fs != null) {
try {
fs.deleteOnExit(tempDir);
} catch (IOException ioe){}
}
}
}
// test if the emulation framework successfully loaded this plugin
static long testInitialization(String id, Configuration conf)
throws IOException {
Path testPath = getInitFilePath(id);
FileSystem fs = FileSystem.getLocal(conf);
return fs.exists(testPath)
? fs.getFileStatus(testPath).getModificationTime()
: 0;
}
// test if the emulation framework successfully loaded this plugin
static long testEmulation(String id, Configuration conf)
throws IOException {
Path testPath = getFilePath(id);
FileSystem fs = FileSystem.getLocal(conf);
return fs.exists(testPath)
? fs.getFileStatus(testPath).getModificationTime()
: 0;
}
}
/**
* Test implementation of {@link ResourceUsageEmulatorPlugin} which creates
* a file named 'others' in the test directory.
*/
static class TestOthers extends TestResourceUsageEmulatorPlugin {
static final String ID = "others";
@Override
protected String getIdentifier() {
return ID;
}
}
/**
* Test implementation of {@link ResourceUsageEmulatorPlugin} which creates
* a file named 'cpu' in the test directory.
*/
static class TestCpu extends TestResourceUsageEmulatorPlugin {
static final String ID = "cpu";
@Override
protected String getIdentifier() {
return ID;
}
}
/**
* Test {@link ResourceUsageMatcher}.
*/
@Test
public void testResourceUsageMatcher() throws Exception {
ResourceUsageMatcher matcher = new ResourceUsageMatcher();
Configuration conf = new Configuration();
conf.setClass(ResourceUsageMatcher.RESOURCE_USAGE_EMULATION_PLUGINS,
TestResourceUsageEmulatorPlugin.class,
ResourceUsageEmulatorPlugin.class);
long currentTime = System.currentTimeMillis();
matcher.configure(conf, null, null, null);
matcher.matchResourceUsage();
String id = TestResourceUsageEmulatorPlugin.DEFAULT_IDENTIFIER;
long result =
TestResourceUsageEmulatorPlugin.testInitialization(id, conf);
assertTrue("Resource usage matcher failed to initialize the configured"
+ " plugin", result > currentTime);
result = TestResourceUsageEmulatorPlugin.testEmulation(id, conf);
assertTrue("Resource usage matcher failed to load and emulate the"
+ " configured plugin", result > currentTime);
// test plugin order to first emulate cpu and then others
conf.setStrings(ResourceUsageMatcher.RESOURCE_USAGE_EMULATION_PLUGINS,
TestCpu.class.getName() + "," + TestOthers.class.getName());
matcher.configure(conf, null, null, null);
// test the initialization order
long time1 =
TestResourceUsageEmulatorPlugin.testInitialization(TestCpu.ID, conf);
long time2 =
TestResourceUsageEmulatorPlugin.testInitialization(TestOthers.ID,
conf);
assertTrue("Resource usage matcher failed to initialize the configured"
+ " plugins in order", time1 < time2);
matcher.matchResourceUsage();
// Note that the cpu usage emulator plugin is configured 1st and then the
// others plugin.
time1 =
TestResourceUsageEmulatorPlugin.testInitialization(TestCpu.ID, conf);
time2 =
TestResourceUsageEmulatorPlugin.testInitialization(TestOthers.ID,
conf);
assertTrue("Resource usage matcher failed to load the configured plugins",
time1 < time2);
}
/**
* Fakes the cumulative usage using {@link FakeCpuUsageEmulatorCore}.
*/
static class FakeResourceUsageMonitor extends DummyResourceCalculatorPlugin {
private FakeCpuUsageEmulatorCore core;
public FakeResourceUsageMonitor(FakeCpuUsageEmulatorCore core) {
this.core = core;
}
/**
* A dummy CPU usage monitor. Every call to
* {@link ResourceCalculatorPlugin#getCumulativeCpuTime()} will return the
* value of {@link FakeCpuUsageEmulatorCore#getNumCalls()}.
*/
@Override
public long getCumulativeCpuTime() {
return core.getCpuUsage();
}
/**
* Returns a {@link ProcResourceValues} with cumulative cpu usage
* computed using {@link #getCumulativeCpuTime()}.
*/
@Override
public ProcResourceValues getProcResourceValues() {
long usageValue = getCumulativeCpuTime();
return new ProcResourceValues(usageValue, -1, -1);
}
}
/**
* A dummy {@link Progressive} implementation that allows users to set the
* progress for testing. The {@link Progressive#getProgress()} call will
* return the last progress value set using
* {@link FakeProgressive#setProgress(float)}.
*/
static class FakeProgressive implements Progressive {
private float progress = 0F;
@Override
public float getProgress() {
return progress;
}
void setProgress(float progress) {
this.progress = progress;
}
}
/**
* A dummy reporter for {@link LoadJob.ResourceUsageMatcherRunner}.
*/
private static class DummyReporter extends StatusReporter {
private Progressive progress;
DummyReporter(Progressive progress) {
this.progress = progress;
}
@Override
public org.apache.hadoop.mapreduce.Counter getCounter(Enum<?> name) {
return null;
}
@Override
public org.apache.hadoop.mapreduce.Counter getCounter(String group,
String name) {
return null;
}
@Override
public void progress() {
}
@Override
public float getProgress() {
return progress.getProgress();
}
@Override
public void setStatus(String status) {
}
}
// Extends ResourceUsageMatcherRunner for testing.
@SuppressWarnings("unchecked")
private static class FakeResourceUsageMatcherRunner
extends ResourceUsageMatcherRunner {
FakeResourceUsageMatcherRunner(TaskInputOutputContext context,
ResourceUsageMetrics metrics) {
super(context, metrics);
}
// test ResourceUsageMatcherRunner
void test() throws Exception {
super.match();
}
}
/**
* Test {@link LoadJob.ResourceUsageMatcherRunner}.
*/
@Test
@SuppressWarnings("unchecked")
public void testResourceUsageMatcherRunner() throws Exception {
Configuration conf = new Configuration();
FakeProgressive progress = new FakeProgressive();
// set the resource calculator plugin
conf.setClass(TTConfig.TT_RESOURCE_CALCULATOR_PLUGIN,
DummyResourceCalculatorPlugin.class,
ResourceCalculatorPlugin.class);
// set the resources
// set the resource implementation class
conf.setClass(ResourceUsageMatcher.RESOURCE_USAGE_EMULATION_PLUGINS,
TestResourceUsageEmulatorPlugin.class,
ResourceUsageEmulatorPlugin.class);
long currentTime = System.currentTimeMillis();
// initialize the matcher class
TaskAttemptID id = new TaskAttemptID("test", 1, TaskType.MAP, 1, 1);
StatusReporter reporter = new DummyReporter(progress);
TaskInputOutputContext context =
new MapContextImpl(conf, id, null, null, null, reporter, null);
FakeResourceUsageMatcherRunner matcher =
new FakeResourceUsageMatcherRunner(context, null);
// check if the matcher initialized the plugin
String identifier = TestResourceUsageEmulatorPlugin.DEFAULT_IDENTIFIER;
long initTime =
TestResourceUsageEmulatorPlugin.testInitialization(identifier, conf);
assertTrue("ResourceUsageMatcherRunner failed to initialize the"
+ " configured plugin", initTime > currentTime);
// check the progress
assertEquals("Progress mismatch in ResourceUsageMatcherRunner",
0, progress.getProgress(), 0D);
// call match() and check progress
progress.setProgress(0.01f);
currentTime = System.currentTimeMillis();
matcher.test();
long emulateTime =
TestResourceUsageEmulatorPlugin.testEmulation(identifier, conf);
assertTrue("ProgressBasedResourceUsageMatcher failed to load and emulate"
+ " the configured plugin", emulateTime > currentTime);
}
/**
* Test {@link CumulativeCpuUsageEmulatorPlugin}'s core CPU usage emulation
* engine.
*/
@Test
public void testCpuUsageEmulator() throws IOException {
// test CpuUsageEmulator calibration with fake resource calculator plugin
long target = 100000L; // 100 secs
int unitUsage = 50;
FakeCpuUsageEmulatorCore fakeCpuEmulator = new FakeCpuUsageEmulatorCore();
fakeCpuEmulator.setUnitUsage(unitUsage);
FakeResourceUsageMonitor fakeMonitor =
new FakeResourceUsageMonitor(fakeCpuEmulator);
// calibrate for 100ms
fakeCpuEmulator.calibrate(fakeMonitor, target);
// by default, CpuUsageEmulator.calibrate() will consume 100ms of CPU usage
assertEquals("Fake calibration failed",
100, fakeMonitor.getCumulativeCpuTime());
assertEquals("Fake calibration failed",
100, fakeCpuEmulator.getCpuUsage());
// by default, CpuUsageEmulator.performUnitComputation() will be called
// twice
assertEquals("Fake calibration failed",
2, fakeCpuEmulator.getNumCalls());
}
/**
* This is a dummy class that fakes CPU usage.
*/
private static class FakeCpuUsageEmulatorCore
extends DefaultCpuUsageEmulator {
private int numCalls = 0;
private int unitUsage = 1;
private int cpuUsage = 0;
@Override
protected void performUnitComputation() {
++numCalls;
cpuUsage += unitUsage;
}
int getNumCalls() {
return numCalls;
}
int getCpuUsage() {
return cpuUsage;
}
void reset() {
numCalls = 0;
cpuUsage = 0;
}
void setUnitUsage(int unitUsage) {
this.unitUsage = unitUsage;
}
}
// Creates a ResourceUsageMetrics object from the target usage
private static ResourceUsageMetrics createMetrics(long target) {
ResourceUsageMetrics metrics = new ResourceUsageMetrics();
metrics.setCumulativeCpuUsage(target);
metrics.setVirtualMemoryUsage(target);
metrics.setPhysicalMemoryUsage(target);
metrics.setHeapUsage(target);
return metrics;
}
/**
* Test {@link CumulativeCpuUsageEmulatorPlugin}.
*/
@Test
public void testCumulativeCpuUsageEmulatorPlugin() throws Exception {
Configuration conf = new Configuration();
long targetCpuUsage = 1000L;
int unitCpuUsage = 50;
// fake progress indicator
FakeProgressive fakeProgress = new FakeProgressive();
// fake cpu usage generator
FakeCpuUsageEmulatorCore fakeCore = new FakeCpuUsageEmulatorCore();
fakeCore.setUnitUsage(unitCpuUsage);
// a cumulative cpu usage emulator with fake core
CumulativeCpuUsageEmulatorPlugin cpuPlugin =
new CumulativeCpuUsageEmulatorPlugin(fakeCore);
// test with invalid or missing resource usage value
ResourceUsageMetrics invalidUsage = createMetrics(0);
cpuPlugin.initialize(conf, invalidUsage, null, null);
// test if disabled cpu emulation plugin's emulate() call is a no-operation
// this will test if the emulation plugin is disabled or not
int numCallsPre = fakeCore.getNumCalls();
long cpuUsagePre = fakeCore.getCpuUsage();
cpuPlugin.emulate();
int numCallsPost = fakeCore.getNumCalls();
long cpuUsagePost = fakeCore.getCpuUsage();
// test if no calls are made cpu usage emulator core
assertEquals("Disabled cumulative CPU usage emulation plugin works!",
numCallsPre, numCallsPost);
// test if no calls are made cpu usage emulator core
assertEquals("Disabled cumulative CPU usage emulation plugin works!",
cpuUsagePre, cpuUsagePost);
// test with valid resource usage value
ResourceUsageMetrics metrics = createMetrics(targetCpuUsage);
// fake monitor
ResourceCalculatorPlugin monitor = new FakeResourceUsageMonitor(fakeCore);
// test with default emulation interval
testEmulationAccuracy(conf, fakeCore, monitor, metrics, cpuPlugin,
targetCpuUsage, targetCpuUsage / unitCpuUsage);
// test with custom value for emulation interval of 20%
conf.setFloat(CumulativeCpuUsageEmulatorPlugin.CPU_EMULATION_FREQUENCY,
0.2F);
testEmulationAccuracy(conf, fakeCore, monitor, metrics, cpuPlugin,
targetCpuUsage, targetCpuUsage / unitCpuUsage);
// test if emulation interval boundary is respected (unit usage = 1)
// test the case where the current progress is less than threshold
fakeProgress = new FakeProgressive(); // initialize
fakeCore.reset();
fakeCore.setUnitUsage(1);
conf.setFloat(CumulativeCpuUsageEmulatorPlugin.CPU_EMULATION_FREQUENCY,
0.25F);
cpuPlugin.initialize(conf, metrics, monitor, fakeProgress);
// take a snapshot after the initialization
long initCpuUsage = monitor.getCumulativeCpuTime();
long initNumCalls = fakeCore.getNumCalls();
// test with 0 progress
testEmulationBoundary(0F, fakeCore, fakeProgress, cpuPlugin, initCpuUsage,
initNumCalls, "[no-op, 0 progress]");
// test with 24% progress
testEmulationBoundary(0.24F, fakeCore, fakeProgress, cpuPlugin,
initCpuUsage, initNumCalls, "[no-op, 24% progress]");
// test with 25% progress
// target = 1000ms, target emulation at 25% = 250ms,
// weighed target = 1000 * 0.25^4 (we are using progress^4 as the weight)
// ~ 4
// but current usage = init-usage = 100, hence expected = 100
testEmulationBoundary(0.25F, fakeCore, fakeProgress, cpuPlugin,
initCpuUsage, initNumCalls, "[op, 25% progress]");
// test with 80% progress
// target = 1000ms, target emulation at 80% = 800ms,
// weighed target = 1000 * 0.25^4 (we are using progress^4 as the weight)
// ~ 410
// current-usage = init-usage = 100, hence expected-usage = 410
testEmulationBoundary(0.80F, fakeCore, fakeProgress, cpuPlugin, 410, 410,
"[op, 80% progress]");
// now test if the final call with 100% progress ramps up the CPU usage
testEmulationBoundary(1F, fakeCore, fakeProgress, cpuPlugin, targetCpuUsage,
targetCpuUsage, "[op, 100% progress]");
// test if emulation interval boundary is respected (unit usage = 50)
// test the case where the current progress is less than threshold
fakeProgress = new FakeProgressive(); // initialize
fakeCore.reset();
fakeCore.setUnitUsage(unitCpuUsage);
conf.setFloat(CumulativeCpuUsageEmulatorPlugin.CPU_EMULATION_FREQUENCY,
0.40F);
cpuPlugin.initialize(conf, metrics, monitor, fakeProgress);
// take a snapshot after the initialization
initCpuUsage = monitor.getCumulativeCpuTime();
initNumCalls = fakeCore.getNumCalls();
// test with 0 progress
testEmulationBoundary(0F, fakeCore, fakeProgress, cpuPlugin, initCpuUsage,
initNumCalls, "[no-op, 0 progress]");
// test with 39% progress
testEmulationBoundary(0.39F, fakeCore, fakeProgress, cpuPlugin,
initCpuUsage, initNumCalls, "[no-op, 39% progress]");
// test with 40% progress
// target = 1000ms, target emulation at 40% = 4000ms,
// weighed target = 1000 * 0.40^4 (we are using progress^4 as the weight)
// ~ 26
// current-usage = init-usage = 100, hence expected-usage = 100
testEmulationBoundary(0.40F, fakeCore, fakeProgress, cpuPlugin,
initCpuUsage, initNumCalls, "[op, 40% progress]");
// test with 90% progress
// target = 1000ms, target emulation at 90% = 900ms,
// weighed target = 1000 * 0.90^4 (we are using progress^4 as the weight)
// ~ 657
// current-usage = init-usage = 100, hence expected-usage = 657 but
// the fake-core increases in steps of 50, hence final target = 700
testEmulationBoundary(0.90F, fakeCore, fakeProgress, cpuPlugin, 700,
700 / unitCpuUsage, "[op, 90% progress]");
// now test if the final call with 100% progress ramps up the CPU usage
testEmulationBoundary(1F, fakeCore, fakeProgress, cpuPlugin, targetCpuUsage,
targetCpuUsage / unitCpuUsage, "[op, 100% progress]");
}
// test whether the CPU usage emulator achieves the desired target using
// desired calls to the underling core engine.
private static void testEmulationAccuracy(Configuration conf,
FakeCpuUsageEmulatorCore fakeCore,
ResourceCalculatorPlugin monitor,
ResourceUsageMetrics metrics,
CumulativeCpuUsageEmulatorPlugin cpuPlugin,
long expectedTotalCpuUsage, long expectedTotalNumCalls)
throws Exception {
FakeProgressive fakeProgress = new FakeProgressive();
fakeCore.reset();
cpuPlugin.initialize(conf, metrics, monitor, fakeProgress);
int numLoops = 0;
while (fakeProgress.getProgress() < 1) {
++numLoops;
float progress = (float)numLoops / 100;
fakeProgress.setProgress(progress);
cpuPlugin.emulate();
}
// test if the resource plugin shows the expected invocations
assertEquals("Cumulative cpu usage emulator plugin failed (num calls)!",
expectedTotalNumCalls, fakeCore.getNumCalls(), 0L);
// test if the resource plugin shows the expected usage
assertEquals("Cumulative cpu usage emulator plugin failed (total usage)!",
expectedTotalCpuUsage, fakeCore.getCpuUsage(), 0L);
}
// tests if the CPU usage emulation plugin emulates only at the expected
// progress gaps
private static void testEmulationBoundary(float progress,
FakeCpuUsageEmulatorCore fakeCore, FakeProgressive fakeProgress,
CumulativeCpuUsageEmulatorPlugin cpuPlugin, long expectedTotalCpuUsage,
long expectedTotalNumCalls, String info) throws Exception {
fakeProgress.setProgress(progress);
cpuPlugin.emulate();
assertEquals("Emulation interval test for cpu usage failed " + info + "!",
expectedTotalCpuUsage, fakeCore.getCpuUsage(), 0L);
assertEquals("Emulation interval test for num calls failed " + info + "!",
expectedTotalNumCalls, fakeCore.getNumCalls(), 0L);
}
}

View File

@ -663,8 +663,53 @@ hadoop jar &lt;gridmix-jar&gt; org.apache.hadoop.mapred.gridmix.Gridmix \
</ul>
<p>High-Ram feature emulation can be disabled by setting
<code>gridmix.highram-emulation.enable</code> to
<code>false</code>. By default High-Ram feature emulation is enabled.
Note that this feature works only for jobs of type <em>LOADJOB</em>.
<code>false</code>.
</p>
</section>
<section id="resource-usage-emulation">
<title>Emulating resource usages</title>
<p>Usages of resources like CPU, physical memory, virtual memory, JVM heap
etc are recorded by MapReduce using its task counters. This information
is used by GridMix to emulate the resource usages in the simulated
tasks. Emulating resource usages will help GridMix exert similar load
on the test cluster as seen in the actual cluster.
</p>
<p>MapReduce tasks use up resources during its entire lifetime. GridMix
also tries to mimic this behavior by spanning resource usage emulation
across the entire lifetime of the simulated task. Each resource to be
emulated should have an <em>emulator</em> associated with it.
Each such <em>emulator</em> should implement the
<code>org.apache.hadoop.mapred.gridmix.emulators.resourceusage
.ResourceUsageEmulatorPlugin</code> interface. Resource
<em>emulators</em> in GridMix are <em>plugins</em> that can be
configured (plugged in or out) before every run. GridMix users can
configure multiple emulator <em>plugins</em> by passing a comma
separated list of <em>emulators</em> as a value for the
<code>gridmix.emulators.resource-usage.plugins</code> parameter.
</p>
<p>List of <em>emulators</em> shipped with GridMix:
</p>
<ul>
<li>Cumulative CPU usage <em>emulator</em>:
GridMix uses the cumulative CPU usage value published by Rumen
and makes sure that the total cumulative CPU usage of the simulated
task is close to the value published by Rumen. GridMix can be
configured to emulate cumulative CPU usage by adding
<code>org.apache.hadoop.mapred.gridmix.emulators.resourceusage
.CumulativeCpuUsageEmulatorPlugin</code> to the list of emulator
<em>plugins</em> configured for the
<code>gridmix.emulators.resource-usage.plugins</code> parameter.
CPU usage emulator is designed in such a way that
it only emulates at specific progress boundaries of the task. This
interval can be configured using
<code>gridmix.emulators.resource-usage.cpu.frequency</code>. The
default value for this parameter is <code>0.1</code> i.e
<code>10%</code>.
</li>
</ul>
<p>Note that GridMix will emulate resource usages only for jobs of type
<em>LOADJOB</em>.
</p>
</section>
@ -677,10 +722,6 @@ hadoop jar &lt;gridmix-jar&gt; org.apache.hadoop.mapred.gridmix.Gridmix \
the following characteristics of job load are not currently captured in
job traces and cannot be accurately reproduced in GridMix:</p>
<ul>
<li><em>CPU Usage</em> - We have no data for per-task CPU usage, so we
cannot even attempt an approximation. GridMix tasks are never
CPU-bound independent of I/O, though this surely happens in
practice.</li>
<li><em>Filesystem Properties</em> - No attempt is made to match block
sizes, namespace hierarchies, or any property of input, intermediate
or output data other than the bytes/records consumed and emitted from