diff --git a/mapreduce/CHANGES.txt b/mapreduce/CHANGES.txt
index 6ee80555d78..c414114ad86 100644
--- a/mapreduce/CHANGES.txt
+++ b/mapreduce/CHANGES.txt
@@ -11,6 +11,9 @@ Trunk (unreleased changes)
NEW FEATURES
+ MAPREDUCE-2106. [Gridmix] Cumulative CPU usage emulation in Gridmix.
+ (amarrk)
+
MAPREDUCE-2543. [Gridmix] High-Ram feature emulation in Gridmix. (amarrk)
MAPREDUCE-2408. [Gridmix] Compression emulation in Gridmix. (amarrk)
diff --git a/mapreduce/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixKey.java b/mapreduce/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixKey.java
index 4b212651da0..e03e1b9d3cc 100644
--- a/mapreduce/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixKey.java
+++ b/mapreduce/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixKey.java
@@ -25,6 +25,7 @@
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.tools.rumen.ResourceUsageMetrics;
class GridmixKey extends GridmixRecord {
static final byte REDUCE_SPEC = 0;
@@ -115,6 +116,22 @@ public void setReduceOutputBytes(long b_out) {
setSize(origSize);
}
+ /**
+ * Get the {@link ResourceUsageMetrics} stored in the key.
+ */
+ public ResourceUsageMetrics getReduceResourceUsageMetrics() {
+ assert REDUCE_SPEC == getType();
+ return spec.metrics;
+ }
+
+ /**
+ * Store the {@link ResourceUsageMetrics} in the key.
+ */
+ public void setReduceResourceUsageMetrics(ResourceUsageMetrics metrics) {
+ assert REDUCE_SPEC == getType();
+ spec.setResourceUsageSpecification(metrics);
+ }
+
public byte getType() {
return type;
}
@@ -195,18 +212,35 @@ public static class Spec implements Writable {
long rec_in;
long rec_out;
long bytes_out;
+ private ResourceUsageMetrics metrics = null;
+ private int sizeOfResourceUsageMetrics = 0;
public Spec() { }
public void set(Spec other) {
rec_in = other.rec_in;
bytes_out = other.bytes_out;
rec_out = other.rec_out;
+ setResourceUsageSpecification(other.metrics);
}
+ /**
+ * Sets the {@link ResourceUsageMetrics} for this {@link Spec}.
+ */
+ public void setResourceUsageSpecification(ResourceUsageMetrics metrics) {
+ this.metrics = metrics;
+ if (metrics != null) {
+ this.sizeOfResourceUsageMetrics = metrics.size();
+ } else {
+ this.sizeOfResourceUsageMetrics = 0;
+ }
+ }
+
public int getSize() {
return WritableUtils.getVIntSize(rec_in) +
WritableUtils.getVIntSize(rec_out) +
- WritableUtils.getVIntSize(bytes_out);
+ WritableUtils.getVIntSize(bytes_out) +
+ WritableUtils.getVIntSize(sizeOfResourceUsageMetrics) +
+ sizeOfResourceUsageMetrics;
}
@Override
@@ -214,6 +248,11 @@ public void readFields(DataInput in) throws IOException {
rec_in = WritableUtils.readVLong(in);
rec_out = WritableUtils.readVLong(in);
bytes_out = WritableUtils.readVLong(in);
+ sizeOfResourceUsageMetrics = WritableUtils.readVInt(in);
+ if (sizeOfResourceUsageMetrics > 0) {
+ metrics = new ResourceUsageMetrics();
+ metrics.readFields(in);
+ }
}
@Override
@@ -221,6 +260,10 @@ public void write(DataOutput out) throws IOException {
WritableUtils.writeVLong(out, rec_in);
WritableUtils.writeVLong(out, rec_out);
WritableUtils.writeVLong(out, bytes_out);
+ WritableUtils.writeVInt(out, sizeOfResourceUsageMetrics);
+ if (sizeOfResourceUsageMetrics > 0) {
+ metrics.write(out);
+ }
}
}
diff --git a/mapreduce/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobFactory.java b/mapreduce/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobFactory.java
index 1fac32c2a5b..57681002e87 100644
--- a/mapreduce/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobFactory.java
+++ b/mapreduce/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobFactory.java
@@ -28,6 +28,7 @@
import org.apache.hadoop.tools.rumen.JobStory;
import org.apache.hadoop.tools.rumen.JobStoryProducer;
import org.apache.hadoop.tools.rumen.Pre21JobHistoryConstants.Values;
+import org.apache.hadoop.tools.rumen.ResourceUsageMetrics;
import org.apache.hadoop.tools.rumen.TaskAttemptInfo;
import org.apache.hadoop.tools.rumen.TaskInfo;
import org.apache.hadoop.tools.rumen.ZombieJobProducer;
@@ -111,7 +112,7 @@ static class MinTaskInfo extends TaskInfo {
public MinTaskInfo(TaskInfo info) {
super(info.getInputBytes(), info.getInputRecords(),
info.getOutputBytes(), info.getOutputRecords(),
- info.getTaskMemory());
+ info.getTaskMemory(), info.getResourceUsageMetrics());
}
public long getInputBytes() {
return Math.max(0, super.getInputBytes());
diff --git a/mapreduce/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/LoadJob.java b/mapreduce/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/LoadJob.java
index c090a5c7e26..e48106890fc 100644
--- a/mapreduce/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/LoadJob.java
+++ b/mapreduce/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/LoadJob.java
@@ -22,6 +22,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.gridmix.emulators.resourceusage.ResourceUsageMatcher;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
@@ -31,10 +32,14 @@
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
+import org.apache.hadoop.mapreduce.util.ResourceCalculatorPlugin;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.tools.rumen.JobStory;
+import org.apache.hadoop.tools.rumen.ResourceUsageMetrics;
import org.apache.hadoop.tools.rumen.TaskInfo;
import java.io.IOException;
@@ -88,6 +93,101 @@ protected boolean canEmulateCompression() {
return true;
}
+ /**
+ * This is a progress based resource usage matcher.
+ */
+ @SuppressWarnings("unchecked")
+ static class ResourceUsageMatcherRunner extends Thread {
+ private final ResourceUsageMatcher matcher;
+ private final Progressive progress;
+ private final long sleepTime;
+ private static final String SLEEP_CONFIG =
+ "gridmix.emulators.resource-usage.sleep-duration";
+ private static final long DEFAULT_SLEEP_TIME = 100; // 100ms
+
+ ResourceUsageMatcherRunner(final TaskInputOutputContext context,
+ ResourceUsageMetrics metrics) {
+ Configuration conf = context.getConfiguration();
+
+ // set the resource calculator plugin
+ Class extends ResourceCalculatorPlugin> clazz =
+ conf.getClass(TTConfig.TT_RESOURCE_CALCULATOR_PLUGIN,
+ null, ResourceCalculatorPlugin.class);
+ ResourceCalculatorPlugin plugin =
+ ResourceCalculatorPlugin.getResourceCalculatorPlugin(clazz, conf);
+
+ // set the other parameters
+ this.sleepTime = conf.getLong(SLEEP_CONFIG, DEFAULT_SLEEP_TIME);
+ progress = new Progressive() {
+ @Override
+ public float getProgress() {
+ return context.getProgress();
+ }
+ };
+
+ // instantiate a resource-usage-matcher
+ matcher = new ResourceUsageMatcher();
+ matcher.configure(conf, plugin, metrics, progress);
+ }
+
+ protected void match() throws Exception {
+ // match the resource usage
+ matcher.matchResourceUsage();
+ }
+
+ @Override
+ public void run() {
+ LOG.info("Resource usage matcher thread started.");
+ try {
+ while (progress.getProgress() < 1) {
+ // match
+ match();
+
+ // sleep for some time
+ try {
+ Thread.sleep(sleepTime);
+ } catch (Exception e) {}
+ }
+
+ // match for progress = 1
+ match();
+ LOG.info("Resource usage emulation complete! Matcher exiting");
+ } catch (Exception e) {
+ LOG.info("Exception while running the resource-usage-emulation matcher"
+ + " thread! Exiting.", e);
+ }
+ }
+ }
+
+ // Makes sure that the TaskTracker doesn't kill the map/reduce tasks while
+ // they are emulating
+ private static class StatusReporter extends Thread {
+ private TaskAttemptContext context;
+ StatusReporter(TaskAttemptContext context) {
+ this.context = context;
+ }
+
+ @Override
+ public void run() {
+ LOG.info("Status reporter thread started.");
+ try {
+ while (context.getProgress() < 1) {
+ // report progress
+ context.progress();
+
+ // sleep for some time
+ try {
+ Thread.sleep(100); // sleep for 100ms
+ } catch (Exception e) {}
+ }
+
+ LOG.info("Status reporter thread exiting");
+ } catch (Exception e) {
+ LOG.info("Exception while running the status reporter thread!", e);
+ }
+ }
+ }
+
public static class LoadMapper
extends Mapper {
@@ -100,6 +200,9 @@ public static class LoadMapper
private final GridmixKey key = new GridmixKey();
private final GridmixRecord val = new GridmixRecord();
+ private ResourceUsageMatcherRunner matcher = null;
+ private StatusReporter reporter = null;
+
@Override
protected void setup(Context ctxt)
throws IOException, InterruptedException {
@@ -133,6 +236,8 @@ protected void setup(Context ctxt)
if (i == id) {
spec.bytes_out = split.getReduceBytes(idx);
spec.rec_out = split.getReduceRecords(idx);
+ spec.setResourceUsageSpecification(
+ split.getReduceResourceUsageMetrics(idx));
++idx;
id += maps;
}
@@ -167,6 +272,13 @@ protected void setup(Context ctxt)
: splitRecords;
ratio = totalRecords / (1.0 * inputRecords);
acc = 0.0;
+
+ matcher = new ResourceUsageMatcherRunner(ctxt,
+ split.getMapResourceUsageMetrics());
+
+ // start the status reporter thread
+ reporter = new StatusReporter(ctxt);
+ reporter.start();
}
@Override
@@ -184,6 +296,13 @@ public void map(NullWritable ignored, GridmixRecord rec,
}
context.write(key, val);
acc -= 1.0;
+
+ // match inline
+ try {
+ matcher.match();
+ } catch (Exception e) {
+ LOG.debug("Error in resource usage emulation! Message: ", e);
+ }
}
}
@@ -195,8 +314,18 @@ public void cleanup(Context context)
while (factory.next(key, val)) {
context.write(key, val);
key.setSeed(r.nextLong());
+
+ // match inline
+ try {
+ matcher.match();
+ } catch (Exception e) {
+ LOG.debug("Error in resource usage emulation! Message: ", e);
+ }
}
}
+
+ // start the matcher thread since the map phase ends here
+ matcher.start();
}
}
@@ -210,6 +339,9 @@ public static class LoadReducer
private double ratio;
private RecordFactory factory;
+ private ResourceUsageMatcherRunner matcher = null;
+ private StatusReporter reporter = null;
+
@Override
protected void setup(Context context)
throws IOException, InterruptedException {
@@ -220,11 +352,15 @@ protected void setup(Context context)
long outBytes = 0L;
long outRecords = 0L;
long inRecords = 0L;
+ ResourceUsageMetrics metrics = new ResourceUsageMetrics();
for (GridmixRecord ignored : context.getValues()) {
final GridmixKey spec = context.getCurrentKey();
inRecords += spec.getReduceInputRecords();
outBytes += spec.getReduceOutputBytes();
outRecords += spec.getReduceOutputRecords();
+ if (spec.getReduceResourceUsageMetrics() != null) {
+ metrics = spec.getReduceResourceUsageMetrics();
+ }
}
if (0 == outRecords && inRecords > 0) {
LOG.info("Spec output bytes w/o records. Using input record count");
@@ -252,6 +388,12 @@ protected void setup(Context context)
context.getConfiguration(), 5*1024);
ratio = outRecords / (1.0 * inRecords);
acc = 0.0;
+
+ matcher = new ResourceUsageMatcherRunner(context, metrics);
+
+ // start the status reporter thread
+ reporter = new StatusReporter(context);
+ reporter.start();
}
@Override
protected void reduce(GridmixKey key, Iterable values,
@@ -262,6 +404,13 @@ protected void reduce(GridmixKey key, Iterable values,
while (acc >= 1.0 && factory.next(null, val)) {
context.write(NullWritable.get(), val);
acc -= 1.0;
+
+ // match inline
+ try {
+ matcher.match();
+ } catch (Exception e) {
+ LOG.debug("Error in resource usage emulation! Message: ", e);
+ }
}
}
}
@@ -272,6 +421,13 @@ protected void cleanup(Context context)
while (factory.next(null, val)) {
context.write(NullWritable.get(), val);
val.setSeed(r.nextLong());
+
+ // match inline
+ try {
+ matcher.match();
+ } catch (Exception e) {
+ LOG.debug("Error in resource usage emulation! Message: ", e);
+ }
}
}
}
@@ -364,11 +520,13 @@ void buildSplits(FilePool inputDir) throws IOException {
final int nSpec = reds / maps + ((reds % maps) > i ? 1 : 0);
final long[] specBytes = new long[nSpec];
final long[] specRecords = new long[nSpec];
+ final ResourceUsageMetrics[] metrics = new ResourceUsageMetrics[nSpec];
for (int j = 0; j < nSpec; ++j) {
final TaskInfo info =
jobdesc.getTaskInfo(TaskType.REDUCE, i + j * maps);
specBytes[j] = info.getOutputBytes();
specRecords[j] = info.getOutputRecords();
+ metrics[j] = info.getResourceUsageMetrics();
if (LOG.isDebugEnabled()) {
LOG.debug(String.format("SPEC(%d) %d -> %d %d %d", id(), i,
i + j * maps, info.getOutputRecords(),
@@ -381,7 +539,8 @@ void buildSplits(FilePool inputDir) throws IOException {
maps, i, info.getInputBytes(), info.getInputRecords(),
info.getOutputBytes(), info.getOutputRecords(),
reduceByteRatio, reduceRecordRatio, specBytes,
- specRecords));
+ specRecords, info.getResourceUsageMetrics(),
+ metrics));
}
pushDescription(id(), splits);
}
diff --git a/mapreduce/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/LoadSplit.java b/mapreduce/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/LoadSplit.java
index c5e3330c481..27e75473a70 100644
--- a/mapreduce/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/LoadSplit.java
+++ b/mapreduce/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/LoadSplit.java
@@ -23,6 +23,7 @@
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
+import org.apache.hadoop.tools.rumen.ResourceUsageMetrics;
class LoadSplit extends CombineFileSplit {
private int id;
@@ -40,6 +41,9 @@ class LoadSplit extends CombineFileSplit {
private long[] reduceOutputBytes = new long[0];
private long[] reduceOutputRecords = new long[0];
+ private ResourceUsageMetrics mapMetrics;
+ private ResourceUsageMetrics[] reduceMetrics;
+
LoadSplit() {
super();
}
@@ -47,7 +51,9 @@ class LoadSplit extends CombineFileSplit {
public LoadSplit(CombineFileSplit cfsplit, int maps, int id, long inputBytes,
long inputRecords, long outputBytes, long outputRecords,
double[] reduceBytes, double[] reduceRecords,
- long[] reduceOutputBytes, long[] reduceOutputRecords)
+ long[] reduceOutputBytes, long[] reduceOutputRecords,
+ ResourceUsageMetrics metrics,
+ ResourceUsageMetrics[] rMetrics)
throws IOException {
super(cfsplit);
this.id = id;
@@ -61,6 +67,8 @@ public LoadSplit(CombineFileSplit cfsplit, int maps, int id, long inputBytes,
nSpec = reduceOutputBytes.length;
this.reduceOutputBytes = reduceOutputBytes;
this.reduceOutputRecords = reduceOutputRecords;
+ this.mapMetrics = metrics;
+ this.reduceMetrics = rMetrics;
}
public int getId() {
@@ -98,6 +106,15 @@ public long getReduceBytes(int i) {
public long getReduceRecords(int i) {
return reduceOutputRecords[i];
}
+
+ public ResourceUsageMetrics getMapResourceUsageMetrics() {
+ return mapMetrics;
+ }
+
+ public ResourceUsageMetrics getReduceResourceUsageMetrics(int i) {
+ return reduceMetrics[i];
+ }
+
@Override
public void write(DataOutput out) throws IOException {
super.write(out);
@@ -117,6 +134,12 @@ public void write(DataOutput out) throws IOException {
WritableUtils.writeVLong(out, reduceOutputBytes[i]);
WritableUtils.writeVLong(out, reduceOutputRecords[i]);
}
+ mapMetrics.write(out);
+ int numReduceMetrics = (reduceMetrics == null) ? 0 : reduceMetrics.length;
+ WritableUtils.writeVInt(out, numReduceMetrics);
+ for (int i = 0; i < numReduceMetrics; ++i) {
+ reduceMetrics[i].write(out);
+ }
}
@Override
public void readFields(DataInput in) throws IOException {
@@ -145,5 +168,13 @@ public void readFields(DataInput in) throws IOException {
reduceOutputBytes[i] = WritableUtils.readVLong(in);
reduceOutputRecords[i] = WritableUtils.readVLong(in);
}
+ mapMetrics = new ResourceUsageMetrics();
+ mapMetrics.readFields(in);
+ int numReduceMetrics = WritableUtils.readVInt(in);
+ reduceMetrics = new ResourceUsageMetrics[numReduceMetrics];
+ for (int i = 0; i < numReduceMetrics; ++i) {
+ reduceMetrics[i] = new ResourceUsageMetrics();
+ reduceMetrics[i].readFields(in);
+ }
}
}
diff --git a/mapreduce/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Progressive.java b/mapreduce/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Progressive.java
new file mode 100644
index 00000000000..4f1399e4fe4
--- /dev/null
+++ b/mapreduce/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Progressive.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+/**
+ * Used to track progress of tasks.
+ */
+public interface Progressive {
+ public float getProgress();
+}
\ No newline at end of file
diff --git a/mapreduce/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/emulators/resourceusage/CumulativeCpuUsageEmulatorPlugin.java b/mapreduce/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/emulators/resourceusage/CumulativeCpuUsageEmulatorPlugin.java
new file mode 100644
index 00000000000..1d362d0bc15
--- /dev/null
+++ b/mapreduce/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/emulators/resourceusage/CumulativeCpuUsageEmulatorPlugin.java
@@ -0,0 +1,315 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix.emulators.resourceusage;
+
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.gridmix.Progressive;
+import org.apache.hadoop.mapreduce.util.ResourceCalculatorPlugin;
+import org.apache.hadoop.tools.rumen.ResourceUsageMetrics;
+
+/**
+ * A {@link ResourceUsageEmulatorPlugin} that emulates the cumulative CPU
+ * usage by performing certain CPU intensive operations. Performing such CPU
+ * intensive operations essentially uses up some CPU. Every
+ * {@link ResourceUsageEmulatorPlugin} is configured with a feedback module i.e
+ * a {@link ResourceCalculatorPlugin}, to monitor the resource usage.
+ *
+ * {@link CumulativeCpuUsageEmulatorPlugin} emulates the CPU usage in steps.
+ * The frequency of emulation can be configured via
+ * {@link #CPU_EMULATION_FREQUENCY}.
+ * CPU usage values are matched via emulation only on the interval boundaries.
+ *
+ *
+ * {@link CumulativeCpuUsageEmulatorPlugin} is a wrapper program for managing
+ * the CPU usage emulation feature. It internally uses an emulation algorithm
+ * (called as core and described using {@link CpuUsageEmulatorCore}) for
+ * performing the actual emulation. Multiple calls to this core engine should
+ * use up some amount of CPU.
+ *
+ * {@link CumulativeCpuUsageEmulatorPlugin} provides a calibration feature
+ * via {@link #initialize(Configuration, ResourceUsageMetrics,
+ * ResourceCalculatorPlugin, Progressive)} to calibrate
+ * the plugin and its core for the underlying hardware. As a result of
+ * calibration, every call to the emulation engine's core should roughly use up
+ * 1% of the total usage value to be emulated. This makes sure that the
+ * underlying hardware is profiled before use and that the plugin doesn't
+ * accidently overuse the CPU. With 1% as the unit emulation target value for
+ * the core engine, there will be roughly 100 calls to the engine resulting in
+ * roughly 100 calls to the feedback (resource usage monitor) module.
+ * Excessive usage of the feedback module is discouraged as
+ * it might result into excess CPU usage resulting into no real CPU emulation.
+ *
+ */
+public class CumulativeCpuUsageEmulatorPlugin
+implements ResourceUsageEmulatorPlugin {
+ protected CpuUsageEmulatorCore emulatorCore;
+ private ResourceCalculatorPlugin monitor;
+ private Progressive progress;
+ private boolean enabled = true;
+ private float emulationInterval; // emulation interval
+ private long targetCpuUsage = 0;
+ private float lastSeenProgress = 0;
+ private long lastSeenCpuUsageCpuUsage = 0;
+
+ // Configuration parameters
+ public static final String CPU_EMULATION_FREQUENCY =
+ "gridmix.emulators.resource-usage.cpu.frequency";
+ private static final float DEFAULT_EMULATION_FREQUENCY = 0.1F; // 10 times
+
+ /**
+ * This is the core CPU usage emulation algorithm. This is the core engine
+ * which actually performs some CPU intensive operations to consume some
+ * amount of CPU. Multiple calls of {@link #compute()} should help the
+ * plugin emulate the desired level of CPU usage. This core engine can be
+ * calibrated using the {@link #calibrate(ResourceCalculatorPlugin, long)}
+ * API to suit the underlying hardware better. It also can be used to optimize
+ * the emulation cycle.
+ */
+ public interface CpuUsageEmulatorCore {
+ /**
+ * Performs some computation to use up some CPU.
+ */
+ public void compute();
+
+ /**
+ * Allows the core to calibrate itself.
+ */
+ public void calibrate(ResourceCalculatorPlugin monitor,
+ long totalCpuUsage);
+ }
+
+ /**
+ * This is the core engine to emulate the CPU usage. The only responsibility
+ * of this class is to perform certain math intensive operations to make sure
+ * that some desired value of CPU is used.
+ */
+ public static class DefaultCpuUsageEmulator implements CpuUsageEmulatorCore {
+ // number of times to loop for performing the basic unit computation
+ private int numIterations;
+ private final Random random;
+
+ /**
+ * This is to fool the JVM and make it think that we need the value
+ * stored in the unit computation i.e {@link #compute()}. This will prevent
+ * the JVM from optimizing the code.
+ */
+ protected double returnValue;
+
+ /**
+ * Initialized the {@link DefaultCpuUsageEmulator} with default values.
+ * Note that the {@link DefaultCpuUsageEmulator} should be calibrated
+ * (see {@link #calibrate(ResourceCalculatorPlugin, long)}) when initialized
+ * using this constructor.
+ */
+ public DefaultCpuUsageEmulator() {
+ this(-1);
+ }
+
+ DefaultCpuUsageEmulator(int numIterations) {
+ this.numIterations = numIterations;
+ random = new Random();
+ }
+
+ /**
+ * This will consume some desired level of CPU. This API will try to use up
+ * 'X' percent of the target cumulative CPU usage. Currently X is set to
+ * 10%.
+ */
+ public void compute() {
+ for (int i = 0; i < numIterations; ++i) {
+ performUnitComputation();
+ }
+ }
+
+ // Perform unit computation. The complete CPU emulation will be based on
+ // multiple invocations to this unit computation module.
+ protected void performUnitComputation() {
+ //TODO can this be configurable too. Users/emulators should be able to
+ // pick and choose what MATH operations to run.
+ // Example :
+ // BASIC : ADD, SUB, MUL, DIV
+ // ADV : SQRT, SIN, COSIN..
+ // COMPO : (BASIC/ADV)*
+ // Also define input generator. For now we can use the random number
+ // generator. Later this can be changed to accept multiple sources.
+
+ int randomData = random.nextInt();
+ int randomDataCube = randomData * randomData * randomData;
+ double randomDataCubeRoot = Math.cbrt(randomData);
+ returnValue = Math.log(Math.tan(randomDataCubeRoot
+ * Math.exp(randomDataCube))
+ * Math.sqrt(randomData));
+ }
+
+ /**
+ * This will calibrate the algorithm such that a single invocation of
+ * {@link #compute()} emulates roughly 1% of the total desired resource
+ * usage value.
+ */
+ public void calibrate(ResourceCalculatorPlugin monitor,
+ long totalCpuUsage) {
+ long initTime = monitor.getProcResourceValues().getCumulativeCpuTime();
+
+ long defaultLoopSize = 0;
+ long finalTime = initTime;
+
+ //TODO Make this configurable
+ while (finalTime - initTime < 100) { // 100 ms
+ ++defaultLoopSize;
+ performUnitComputation(); //perform unit computation
+ finalTime = monitor.getProcResourceValues().getCumulativeCpuTime();
+ }
+
+ long referenceRuntime = finalTime - initTime;
+
+ // time for one loop = (final-time - init-time) / total-loops
+ float timePerLoop = ((float)referenceRuntime) / defaultLoopSize;
+
+ // compute the 1% of the total CPU usage desired
+ //TODO Make this configurable
+ long onePercent = totalCpuUsage / 100;
+
+ // num-iterations for 1% = (total-desired-usage / 100) / time-for-one-loop
+ numIterations = Math.max(1, (int)((float)onePercent/timePerLoop));
+
+ System.out.println("Calibration done. Basic computation runtime : "
+ + timePerLoop + " milliseconds. Optimal number of iterations (1%): "
+ + numIterations);
+ }
+ }
+
+ public CumulativeCpuUsageEmulatorPlugin() {
+ this(new DefaultCpuUsageEmulator());
+ }
+
+ /**
+ * For testing.
+ */
+ public CumulativeCpuUsageEmulatorPlugin(CpuUsageEmulatorCore core) {
+ emulatorCore = core;
+ }
+
+ // Note that this weighing function uses only the current progress. In future,
+ // this might depend on progress, emulation-interval and expected target.
+ private float getWeightForProgressInterval(float progress) {
+ // we want some kind of exponential growth function that gives less weight
+ // on lower progress boundaries but high (exact emulation) near progress
+ // value of 1.
+ // so here is how the current growth function looks like
+ // progress weight
+ // 0.1 0.0001
+ // 0.2 0.0016
+ // 0.3 0.0081
+ // 0.4 0.0256
+ // 0.5 0.0625
+ // 0.6 0.1296
+ // 0.7 0.2401
+ // 0.8 0.4096
+ // 0.9 0.6561
+ // 1.0 1.000
+
+ return progress * progress * progress * progress;
+ }
+
+ @Override
+ //TODO Multi-threading for speedup?
+ public void emulate() throws IOException, InterruptedException {
+ if (enabled) {
+ float currentProgress = progress.getProgress();
+ if (lastSeenProgress < currentProgress
+ && ((currentProgress - lastSeenProgress) >= emulationInterval
+ || currentProgress == 1)) {
+ // Estimate the final cpu usage
+ //
+ // Consider the following
+ // Cl/Cc/Cp : Last/Current/Projected Cpu usage
+ // Pl/Pc/Pp : Last/Current/Projected progress
+ // Then
+ // (Cp-Cc)/(Pp-Pc) = (Cc-Cl)/(Pc-Pl)
+ // Solving this for Cp, we get
+ // Cp = Cc + (1-Pc)*(Cc-Cl)/Pc-Pl)
+ // Note that (Cc-Cl)/(Pc-Pl) is termed as 'rate' in the following
+ // section
+
+ long currentCpuUsage =
+ monitor.getProcResourceValues().getCumulativeCpuTime();
+ // estimate the cpu usage rate
+ float rate = (currentCpuUsage - lastSeenCpuUsageCpuUsage)
+ / (currentProgress - lastSeenProgress);
+ long projectedUsage =
+ currentCpuUsage + (long)((1 - currentProgress) * rate);
+
+ if (projectedUsage < targetCpuUsage) {
+ // determine the correction factor between the current usage and the
+ // expected usage and add some weight to the target
+ long currentWeighedTarget =
+ (long)(targetCpuUsage
+ * getWeightForProgressInterval(currentProgress));
+
+ while (monitor.getProcResourceValues().getCumulativeCpuTime()
+ < currentWeighedTarget) {
+ emulatorCore.compute();
+ // sleep for 100ms
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException ie) {
+ String message =
+ "CumulativeCpuUsageEmulatorPlugin got interrupted. Exiting.";
+ throw new RuntimeException(message);
+ }
+ }
+ }
+
+ // set the last seen progress
+ lastSeenProgress = progress.getProgress();
+ // set the last seen usage
+ lastSeenCpuUsageCpuUsage =
+ monitor.getProcResourceValues().getCumulativeCpuTime();
+ }
+ }
+ }
+
+ @Override
+ public void initialize(Configuration conf, ResourceUsageMetrics metrics,
+ ResourceCalculatorPlugin monitor,
+ Progressive progress) {
+ // get the target CPU usage
+ targetCpuUsage = metrics.getCumulativeCpuUsage();
+ if (targetCpuUsage <= 0 ) {
+ enabled = false;
+ return;
+ } else {
+ enabled = true;
+ }
+
+ this.monitor = monitor;
+ this.progress = progress;
+ emulationInterval = conf.getFloat(CPU_EMULATION_FREQUENCY,
+ DEFAULT_EMULATION_FREQUENCY);
+
+ // calibrate the core cpu-usage utility
+ emulatorCore.calibrate(monitor, targetCpuUsage);
+
+ // initialize the states
+ lastSeenProgress = 0;
+ lastSeenCpuUsageCpuUsage = 0;
+ }
+}
\ No newline at end of file
diff --git a/mapreduce/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/emulators/resourceusage/ResourceUsageEmulatorPlugin.java b/mapreduce/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/emulators/resourceusage/ResourceUsageEmulatorPlugin.java
new file mode 100644
index 00000000000..7d40cfd5e7e
--- /dev/null
+++ b/mapreduce/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/emulators/resourceusage/ResourceUsageEmulatorPlugin.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix.emulators.resourceusage;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapred.gridmix.Progressive;
+import org.apache.hadoop.mapreduce.util.ResourceCalculatorPlugin;
+import org.apache.hadoop.tools.rumen.ResourceUsageMetrics;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Each resource to be emulated should have a corresponding implementation
+ * class that implements {@link ResourceUsageEmulatorPlugin}.
+ *
+ * {@link ResourceUsageEmulatorPlugin} will be configured using the
+ * {@link #initialize(Configuration, ResourceUsageMetrics,
+ * ResourceCalculatorPlugin, Progressive)} call.
+ * Every
+ * {@link ResourceUsageEmulatorPlugin} is also configured with a feedback module
+ * i.e a {@link ResourceCalculatorPlugin}, to monitor the current resource
+ * usage. {@link ResourceUsageMetrics} decides the final resource usage value to
+ * emulate. {@link Progressive} keeps track of the task's progress.
+ *
+ *
+ *
+ * For configuring GridMix to load and and use a resource usage emulator,
+ * see {@link ResourceUsageMatcher}.
+ */
+public interface ResourceUsageEmulatorPlugin {
+ /**
+ * Initialize the plugin. This might involve
+ * - initializing the variables
+ * - calibrating the plugin
+ */
+ void initialize(Configuration conf, ResourceUsageMetrics metrics,
+ ResourceCalculatorPlugin monitor,
+ Progressive progress);
+
+ /**
+ * Emulate the resource usage to match the usage target. The plugin can use
+ * the given {@link ResourceCalculatorPlugin} to query for the current
+ * resource usage.
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ void emulate() throws IOException, InterruptedException;
+}
diff --git a/mapreduce/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/emulators/resourceusage/ResourceUsageMatcher.java b/mapreduce/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/emulators/resourceusage/ResourceUsageMatcher.java
new file mode 100644
index 00000000000..10d6e733f1c
--- /dev/null
+++ b/mapreduce/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/emulators/resourceusage/ResourceUsageMatcher.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix.emulators.resourceusage;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.gridmix.Progressive;
+import org.apache.hadoop.mapreduce.util.ResourceCalculatorPlugin;
+import org.apache.hadoop.tools.rumen.ResourceUsageMetrics;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * This is the driver class for managing all the resource usage emulators.
+ * {@link ResourceUsageMatcher} expects a comma separated list of
+ * {@link ResourceUsageEmulatorPlugin} implementations specified using
+ * {@link #RESOURCE_USAGE_EMULATION_PLUGINS} as the configuration parameter.
+ *
+ * Note that the order in which the emulators are invoked is same as the
+ * order in which they are configured.
+ */
+public class ResourceUsageMatcher {
+ /**
+ * Configuration key to set resource usage emulators.
+ */
+ public static final String RESOURCE_USAGE_EMULATION_PLUGINS =
+ "gridmix.emulators.resource-usage.plugins";
+
+ private List emulationPlugins =
+ new ArrayList();
+
+ /**
+ * Configure the {@link ResourceUsageMatcher} to load the configured plugins
+ * and initialize them.
+ */
+ @SuppressWarnings("unchecked")
+ public void configure(Configuration conf, ResourceCalculatorPlugin monitor,
+ ResourceUsageMetrics metrics, Progressive progress) {
+ Class[] plugins =
+ conf.getClasses(RESOURCE_USAGE_EMULATION_PLUGINS,
+ ResourceUsageEmulatorPlugin.class);
+ if (plugins == null) {
+ System.out.println("No resource usage emulator plugins configured.");
+ } else {
+ for (Class extends ResourceUsageEmulatorPlugin> plugin : plugins) {
+ if (plugin != null) {
+ emulationPlugins.add(ReflectionUtils.newInstance(plugin, conf));
+ }
+ }
+ }
+
+ // initialize the emulators once all the configured emulator plugins are
+ // loaded
+ for (ResourceUsageEmulatorPlugin emulator : emulationPlugins) {
+ emulator.initialize(conf, metrics, monitor, progress);
+ }
+ }
+
+ public void matchResourceUsage() throws Exception {
+ for (ResourceUsageEmulatorPlugin emulator : emulationPlugins) {
+ // match the resource usage
+ emulator.emulate();
+ }
+ }
+}
\ No newline at end of file
diff --git a/mapreduce/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixRecord.java b/mapreduce/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixRecord.java
index c283e8609c4..2f3ce701d6b 100644
--- a/mapreduce/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixRecord.java
+++ b/mapreduce/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixRecord.java
@@ -176,7 +176,8 @@ static void checkSpec(GridmixKey a, GridmixKey b) throws Exception {
a.setReduceOutputBytes(out_bytes);
final int min = WritableUtils.getVIntSize(in_rec)
+ WritableUtils.getVIntSize(out_rec)
- + WritableUtils.getVIntSize(out_bytes);
+ + WritableUtils.getVIntSize(out_bytes)
+ + WritableUtils.getVIntSize(0);
assertEquals(min + 2, a.fixedBytes()); // meta + vint min
final int size = r.nextInt(1024) + a.fixedBytes() + 1;
setSerialize(a, r.nextLong(), size, out);
@@ -207,7 +208,7 @@ static void setSerialize(GridmixRecord x, long seed, int size,
@Test
public void testKeySpec() throws Exception {
- final int min = 5;
+ final int min = 6;
final int max = 300;
final GridmixKey a = new GridmixKey(GridmixKey.REDUCE_SPEC, 1, 0L);
final GridmixKey b = new GridmixKey(GridmixKey.REDUCE_SPEC, 1, 0L);
diff --git a/mapreduce/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestResourceUsageEmulators.java b/mapreduce/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestResourceUsageEmulators.java
new file mode 100644
index 00000000000..8dd38820984
--- /dev/null
+++ b/mapreduce/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestResourceUsageEmulators.java
@@ -0,0 +1,613 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import java.io.IOException;
+
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.StatusReporter;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
+import org.apache.hadoop.mapreduce.task.MapContextImpl;
+import org.apache.hadoop.mapreduce.util.ResourceCalculatorPlugin;
+import org.apache.hadoop.mapreduce.util.ResourceCalculatorPlugin.ProcResourceValues;
+import org.apache.hadoop.tools.rumen.ResourceUsageMetrics;
+import org.apache.hadoop.mapred.DummyResourceCalculatorPlugin;
+import org.apache.hadoop.mapred.gridmix.LoadJob.ResourceUsageMatcherRunner;
+import org.apache.hadoop.mapred.gridmix.emulators.resourceusage.CumulativeCpuUsageEmulatorPlugin;
+import org.apache.hadoop.mapred.gridmix.emulators.resourceusage.ResourceUsageEmulatorPlugin;
+import org.apache.hadoop.mapred.gridmix.emulators.resourceusage.ResourceUsageMatcher;
+import org.apache.hadoop.mapred.gridmix.emulators.resourceusage.CumulativeCpuUsageEmulatorPlugin.DefaultCpuUsageEmulator;
+
+/**
+ * Test Gridmix's resource emulator framework and supported plugins.
+ */
+public class TestResourceUsageEmulators {
+ /**
+ * A {@link ResourceUsageEmulatorPlugin} implementation for testing purpose.
+ * It essentially creates a file named 'test' in the test directory.
+ */
+ static class TestResourceUsageEmulatorPlugin
+ implements ResourceUsageEmulatorPlugin {
+ static final Path rootTempDir =
+ new Path(System.getProperty("test.build.data", "/tmp"));
+ static final Path tempDir =
+ new Path(rootTempDir, "TestResourceUsageEmulatorPlugin");
+ static final String DEFAULT_IDENTIFIER = "test";
+
+ private Path touchPath = null;
+ private FileSystem fs = null;
+
+ @Override
+ public void emulate() throws IOException, InterruptedException {
+ // add some time between 2 calls to emulate()
+ try {
+ Thread.sleep(1000); // sleep for 1s
+ } catch (Exception e){}
+
+ try {
+ fs.delete(touchPath, false); // delete the touch file
+ //TODO Search for a better touch utility
+ fs.create(touchPath).close(); // recreate it
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ protected String getIdentifier() {
+ return DEFAULT_IDENTIFIER;
+ }
+
+ private static Path getFilePath(String id) {
+ return new Path(tempDir, id);
+ }
+
+ private static Path getInitFilePath(String id) {
+ return new Path(tempDir, id + ".init");
+ }
+
+ @Override
+ public void initialize(Configuration conf, ResourceUsageMetrics metrics,
+ ResourceCalculatorPlugin monitor, Progressive progress) {
+ // add some time between 2 calls to initialize()
+ try {
+ Thread.sleep(1000); // sleep for 1s
+ } catch (Exception e){}
+
+ try {
+ fs = FileSystem.getLocal(conf);
+
+ Path initPath = getInitFilePath(getIdentifier());
+ fs.delete(initPath, false); // delete the old file
+ fs.create(initPath).close(); // create a new one
+
+ touchPath = getFilePath(getIdentifier());
+ fs.delete(touchPath, false);
+ } catch (Exception e) {
+
+ } finally {
+ if (fs != null) {
+ try {
+ fs.deleteOnExit(tempDir);
+ } catch (IOException ioe){}
+ }
+ }
+ }
+
+ // test if the emulation framework successfully loaded this plugin
+ static long testInitialization(String id, Configuration conf)
+ throws IOException {
+ Path testPath = getInitFilePath(id);
+ FileSystem fs = FileSystem.getLocal(conf);
+ return fs.exists(testPath)
+ ? fs.getFileStatus(testPath).getModificationTime()
+ : 0;
+ }
+
+ // test if the emulation framework successfully loaded this plugin
+ static long testEmulation(String id, Configuration conf)
+ throws IOException {
+ Path testPath = getFilePath(id);
+ FileSystem fs = FileSystem.getLocal(conf);
+ return fs.exists(testPath)
+ ? fs.getFileStatus(testPath).getModificationTime()
+ : 0;
+ }
+ }
+
+ /**
+ * Test implementation of {@link ResourceUsageEmulatorPlugin} which creates
+ * a file named 'others' in the test directory.
+ */
+ static class TestOthers extends TestResourceUsageEmulatorPlugin {
+ static final String ID = "others";
+
+ @Override
+ protected String getIdentifier() {
+ return ID;
+ }
+ }
+
+ /**
+ * Test implementation of {@link ResourceUsageEmulatorPlugin} which creates
+ * a file named 'cpu' in the test directory.
+ */
+ static class TestCpu extends TestResourceUsageEmulatorPlugin {
+ static final String ID = "cpu";
+
+ @Override
+ protected String getIdentifier() {
+ return ID;
+ }
+ }
+
+ /**
+ * Test {@link ResourceUsageMatcher}.
+ */
+ @Test
+ public void testResourceUsageMatcher() throws Exception {
+ ResourceUsageMatcher matcher = new ResourceUsageMatcher();
+ Configuration conf = new Configuration();
+ conf.setClass(ResourceUsageMatcher.RESOURCE_USAGE_EMULATION_PLUGINS,
+ TestResourceUsageEmulatorPlugin.class,
+ ResourceUsageEmulatorPlugin.class);
+ long currentTime = System.currentTimeMillis();
+
+ matcher.configure(conf, null, null, null);
+
+ matcher.matchResourceUsage();
+
+ String id = TestResourceUsageEmulatorPlugin.DEFAULT_IDENTIFIER;
+ long result =
+ TestResourceUsageEmulatorPlugin.testInitialization(id, conf);
+ assertTrue("Resource usage matcher failed to initialize the configured"
+ + " plugin", result > currentTime);
+ result = TestResourceUsageEmulatorPlugin.testEmulation(id, conf);
+ assertTrue("Resource usage matcher failed to load and emulate the"
+ + " configured plugin", result > currentTime);
+
+ // test plugin order to first emulate cpu and then others
+ conf.setStrings(ResourceUsageMatcher.RESOURCE_USAGE_EMULATION_PLUGINS,
+ TestCpu.class.getName() + "," + TestOthers.class.getName());
+
+ matcher.configure(conf, null, null, null);
+
+ // test the initialization order
+ long time1 =
+ TestResourceUsageEmulatorPlugin.testInitialization(TestCpu.ID, conf);
+ long time2 =
+ TestResourceUsageEmulatorPlugin.testInitialization(TestOthers.ID,
+ conf);
+ assertTrue("Resource usage matcher failed to initialize the configured"
+ + " plugins in order", time1 < time2);
+
+ matcher.matchResourceUsage();
+
+ // Note that the cpu usage emulator plugin is configured 1st and then the
+ // others plugin.
+ time1 =
+ TestResourceUsageEmulatorPlugin.testInitialization(TestCpu.ID, conf);
+ time2 =
+ TestResourceUsageEmulatorPlugin.testInitialization(TestOthers.ID,
+ conf);
+ assertTrue("Resource usage matcher failed to load the configured plugins",
+ time1 < time2);
+ }
+
+ /**
+ * Fakes the cumulative usage using {@link FakeCpuUsageEmulatorCore}.
+ */
+ static class FakeResourceUsageMonitor extends DummyResourceCalculatorPlugin {
+ private FakeCpuUsageEmulatorCore core;
+
+ public FakeResourceUsageMonitor(FakeCpuUsageEmulatorCore core) {
+ this.core = core;
+ }
+
+ /**
+ * A dummy CPU usage monitor. Every call to
+ * {@link ResourceCalculatorPlugin#getCumulativeCpuTime()} will return the
+ * value of {@link FakeCpuUsageEmulatorCore#getNumCalls()}.
+ */
+ @Override
+ public long getCumulativeCpuTime() {
+ return core.getCpuUsage();
+ }
+
+ /**
+ * Returns a {@link ProcResourceValues} with cumulative cpu usage
+ * computed using {@link #getCumulativeCpuTime()}.
+ */
+ @Override
+ public ProcResourceValues getProcResourceValues() {
+ long usageValue = getCumulativeCpuTime();
+ return new ProcResourceValues(usageValue, -1, -1);
+ }
+ }
+
+ /**
+ * A dummy {@link Progressive} implementation that allows users to set the
+ * progress for testing. The {@link Progressive#getProgress()} call will
+ * return the last progress value set using
+ * {@link FakeProgressive#setProgress(float)}.
+ */
+ static class FakeProgressive implements Progressive {
+ private float progress = 0F;
+ @Override
+ public float getProgress() {
+ return progress;
+ }
+
+ void setProgress(float progress) {
+ this.progress = progress;
+ }
+ }
+
+ /**
+ * A dummy reporter for {@link LoadJob.ResourceUsageMatcherRunner}.
+ */
+ private static class DummyReporter extends StatusReporter {
+ private Progressive progress;
+
+ DummyReporter(Progressive progress) {
+ this.progress = progress;
+ }
+
+ @Override
+ public org.apache.hadoop.mapreduce.Counter getCounter(Enum> name) {
+ return null;
+ }
+
+ @Override
+ public org.apache.hadoop.mapreduce.Counter getCounter(String group,
+ String name) {
+ return null;
+ }
+
+ @Override
+ public void progress() {
+ }
+
+ @Override
+ public float getProgress() {
+ return progress.getProgress();
+ }
+
+ @Override
+ public void setStatus(String status) {
+ }
+ }
+
+ // Extends ResourceUsageMatcherRunner for testing.
+ @SuppressWarnings("unchecked")
+ private static class FakeResourceUsageMatcherRunner
+ extends ResourceUsageMatcherRunner {
+ FakeResourceUsageMatcherRunner(TaskInputOutputContext context,
+ ResourceUsageMetrics metrics) {
+ super(context, metrics);
+ }
+
+ // test ResourceUsageMatcherRunner
+ void test() throws Exception {
+ super.match();
+ }
+ }
+
+ /**
+ * Test {@link LoadJob.ResourceUsageMatcherRunner}.
+ */
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testResourceUsageMatcherRunner() throws Exception {
+ Configuration conf = new Configuration();
+ FakeProgressive progress = new FakeProgressive();
+
+ // set the resource calculator plugin
+ conf.setClass(TTConfig.TT_RESOURCE_CALCULATOR_PLUGIN,
+ DummyResourceCalculatorPlugin.class,
+ ResourceCalculatorPlugin.class);
+ // set the resources
+ // set the resource implementation class
+ conf.setClass(ResourceUsageMatcher.RESOURCE_USAGE_EMULATION_PLUGINS,
+ TestResourceUsageEmulatorPlugin.class,
+ ResourceUsageEmulatorPlugin.class);
+
+ long currentTime = System.currentTimeMillis();
+
+ // initialize the matcher class
+ TaskAttemptID id = new TaskAttemptID("test", 1, TaskType.MAP, 1, 1);
+ StatusReporter reporter = new DummyReporter(progress);
+ TaskInputOutputContext context =
+ new MapContextImpl(conf, id, null, null, null, reporter, null);
+ FakeResourceUsageMatcherRunner matcher =
+ new FakeResourceUsageMatcherRunner(context, null);
+
+ // check if the matcher initialized the plugin
+ String identifier = TestResourceUsageEmulatorPlugin.DEFAULT_IDENTIFIER;
+ long initTime =
+ TestResourceUsageEmulatorPlugin.testInitialization(identifier, conf);
+ assertTrue("ResourceUsageMatcherRunner failed to initialize the"
+ + " configured plugin", initTime > currentTime);
+
+ // check the progress
+ assertEquals("Progress mismatch in ResourceUsageMatcherRunner",
+ 0, progress.getProgress(), 0D);
+
+ // call match() and check progress
+ progress.setProgress(0.01f);
+ currentTime = System.currentTimeMillis();
+ matcher.test();
+ long emulateTime =
+ TestResourceUsageEmulatorPlugin.testEmulation(identifier, conf);
+ assertTrue("ProgressBasedResourceUsageMatcher failed to load and emulate"
+ + " the configured plugin", emulateTime > currentTime);
+ }
+
+ /**
+ * Test {@link CumulativeCpuUsageEmulatorPlugin}'s core CPU usage emulation
+ * engine.
+ */
+ @Test
+ public void testCpuUsageEmulator() throws IOException {
+ // test CpuUsageEmulator calibration with fake resource calculator plugin
+ long target = 100000L; // 100 secs
+ int unitUsage = 50;
+ FakeCpuUsageEmulatorCore fakeCpuEmulator = new FakeCpuUsageEmulatorCore();
+ fakeCpuEmulator.setUnitUsage(unitUsage);
+ FakeResourceUsageMonitor fakeMonitor =
+ new FakeResourceUsageMonitor(fakeCpuEmulator);
+
+ // calibrate for 100ms
+ fakeCpuEmulator.calibrate(fakeMonitor, target);
+
+ // by default, CpuUsageEmulator.calibrate() will consume 100ms of CPU usage
+ assertEquals("Fake calibration failed",
+ 100, fakeMonitor.getCumulativeCpuTime());
+ assertEquals("Fake calibration failed",
+ 100, fakeCpuEmulator.getCpuUsage());
+ // by default, CpuUsageEmulator.performUnitComputation() will be called
+ // twice
+ assertEquals("Fake calibration failed",
+ 2, fakeCpuEmulator.getNumCalls());
+ }
+
+ /**
+ * This is a dummy class that fakes CPU usage.
+ */
+ private static class FakeCpuUsageEmulatorCore
+ extends DefaultCpuUsageEmulator {
+ private int numCalls = 0;
+ private int unitUsage = 1;
+ private int cpuUsage = 0;
+
+ @Override
+ protected void performUnitComputation() {
+ ++numCalls;
+ cpuUsage += unitUsage;
+ }
+
+ int getNumCalls() {
+ return numCalls;
+ }
+
+ int getCpuUsage() {
+ return cpuUsage;
+ }
+
+ void reset() {
+ numCalls = 0;
+ cpuUsage = 0;
+ }
+
+ void setUnitUsage(int unitUsage) {
+ this.unitUsage = unitUsage;
+ }
+ }
+
+ // Creates a ResourceUsageMetrics object from the target usage
+ private static ResourceUsageMetrics createMetrics(long target) {
+ ResourceUsageMetrics metrics = new ResourceUsageMetrics();
+ metrics.setCumulativeCpuUsage(target);
+ metrics.setVirtualMemoryUsage(target);
+ metrics.setPhysicalMemoryUsage(target);
+ metrics.setHeapUsage(target);
+ return metrics;
+ }
+
+ /**
+ * Test {@link CumulativeCpuUsageEmulatorPlugin}.
+ */
+ @Test
+ public void testCumulativeCpuUsageEmulatorPlugin() throws Exception {
+ Configuration conf = new Configuration();
+ long targetCpuUsage = 1000L;
+ int unitCpuUsage = 50;
+
+ // fake progress indicator
+ FakeProgressive fakeProgress = new FakeProgressive();
+
+ // fake cpu usage generator
+ FakeCpuUsageEmulatorCore fakeCore = new FakeCpuUsageEmulatorCore();
+ fakeCore.setUnitUsage(unitCpuUsage);
+
+ // a cumulative cpu usage emulator with fake core
+ CumulativeCpuUsageEmulatorPlugin cpuPlugin =
+ new CumulativeCpuUsageEmulatorPlugin(fakeCore);
+
+ // test with invalid or missing resource usage value
+ ResourceUsageMetrics invalidUsage = createMetrics(0);
+ cpuPlugin.initialize(conf, invalidUsage, null, null);
+
+ // test if disabled cpu emulation plugin's emulate() call is a no-operation
+ // this will test if the emulation plugin is disabled or not
+ int numCallsPre = fakeCore.getNumCalls();
+ long cpuUsagePre = fakeCore.getCpuUsage();
+ cpuPlugin.emulate();
+ int numCallsPost = fakeCore.getNumCalls();
+ long cpuUsagePost = fakeCore.getCpuUsage();
+
+ // test if no calls are made cpu usage emulator core
+ assertEquals("Disabled cumulative CPU usage emulation plugin works!",
+ numCallsPre, numCallsPost);
+
+ // test if no calls are made cpu usage emulator core
+ assertEquals("Disabled cumulative CPU usage emulation plugin works!",
+ cpuUsagePre, cpuUsagePost);
+
+ // test with valid resource usage value
+ ResourceUsageMetrics metrics = createMetrics(targetCpuUsage);
+
+ // fake monitor
+ ResourceCalculatorPlugin monitor = new FakeResourceUsageMonitor(fakeCore);
+
+ // test with default emulation interval
+ testEmulationAccuracy(conf, fakeCore, monitor, metrics, cpuPlugin,
+ targetCpuUsage, targetCpuUsage / unitCpuUsage);
+
+ // test with custom value for emulation interval of 20%
+ conf.setFloat(CumulativeCpuUsageEmulatorPlugin.CPU_EMULATION_FREQUENCY,
+ 0.2F);
+ testEmulationAccuracy(conf, fakeCore, monitor, metrics, cpuPlugin,
+ targetCpuUsage, targetCpuUsage / unitCpuUsage);
+
+ // test if emulation interval boundary is respected (unit usage = 1)
+ // test the case where the current progress is less than threshold
+ fakeProgress = new FakeProgressive(); // initialize
+ fakeCore.reset();
+ fakeCore.setUnitUsage(1);
+ conf.setFloat(CumulativeCpuUsageEmulatorPlugin.CPU_EMULATION_FREQUENCY,
+ 0.25F);
+ cpuPlugin.initialize(conf, metrics, monitor, fakeProgress);
+ // take a snapshot after the initialization
+ long initCpuUsage = monitor.getCumulativeCpuTime();
+ long initNumCalls = fakeCore.getNumCalls();
+ // test with 0 progress
+ testEmulationBoundary(0F, fakeCore, fakeProgress, cpuPlugin, initCpuUsage,
+ initNumCalls, "[no-op, 0 progress]");
+ // test with 24% progress
+ testEmulationBoundary(0.24F, fakeCore, fakeProgress, cpuPlugin,
+ initCpuUsage, initNumCalls, "[no-op, 24% progress]");
+ // test with 25% progress
+ // target = 1000ms, target emulation at 25% = 250ms,
+ // weighed target = 1000 * 0.25^4 (we are using progress^4 as the weight)
+ // ~ 4
+ // but current usage = init-usage = 100, hence expected = 100
+ testEmulationBoundary(0.25F, fakeCore, fakeProgress, cpuPlugin,
+ initCpuUsage, initNumCalls, "[op, 25% progress]");
+
+ // test with 80% progress
+ // target = 1000ms, target emulation at 80% = 800ms,
+ // weighed target = 1000 * 0.25^4 (we are using progress^4 as the weight)
+ // ~ 410
+ // current-usage = init-usage = 100, hence expected-usage = 410
+ testEmulationBoundary(0.80F, fakeCore, fakeProgress, cpuPlugin, 410, 410,
+ "[op, 80% progress]");
+
+ // now test if the final call with 100% progress ramps up the CPU usage
+ testEmulationBoundary(1F, fakeCore, fakeProgress, cpuPlugin, targetCpuUsage,
+ targetCpuUsage, "[op, 100% progress]");
+
+ // test if emulation interval boundary is respected (unit usage = 50)
+ // test the case where the current progress is less than threshold
+ fakeProgress = new FakeProgressive(); // initialize
+ fakeCore.reset();
+ fakeCore.setUnitUsage(unitCpuUsage);
+ conf.setFloat(CumulativeCpuUsageEmulatorPlugin.CPU_EMULATION_FREQUENCY,
+ 0.40F);
+ cpuPlugin.initialize(conf, metrics, monitor, fakeProgress);
+ // take a snapshot after the initialization
+ initCpuUsage = monitor.getCumulativeCpuTime();
+ initNumCalls = fakeCore.getNumCalls();
+ // test with 0 progress
+ testEmulationBoundary(0F, fakeCore, fakeProgress, cpuPlugin, initCpuUsage,
+ initNumCalls, "[no-op, 0 progress]");
+ // test with 39% progress
+ testEmulationBoundary(0.39F, fakeCore, fakeProgress, cpuPlugin,
+ initCpuUsage, initNumCalls, "[no-op, 39% progress]");
+ // test with 40% progress
+ // target = 1000ms, target emulation at 40% = 4000ms,
+ // weighed target = 1000 * 0.40^4 (we are using progress^4 as the weight)
+ // ~ 26
+ // current-usage = init-usage = 100, hence expected-usage = 100
+ testEmulationBoundary(0.40F, fakeCore, fakeProgress, cpuPlugin,
+ initCpuUsage, initNumCalls, "[op, 40% progress]");
+
+ // test with 90% progress
+ // target = 1000ms, target emulation at 90% = 900ms,
+ // weighed target = 1000 * 0.90^4 (we are using progress^4 as the weight)
+ // ~ 657
+ // current-usage = init-usage = 100, hence expected-usage = 657 but
+ // the fake-core increases in steps of 50, hence final target = 700
+ testEmulationBoundary(0.90F, fakeCore, fakeProgress, cpuPlugin, 700,
+ 700 / unitCpuUsage, "[op, 90% progress]");
+
+ // now test if the final call with 100% progress ramps up the CPU usage
+ testEmulationBoundary(1F, fakeCore, fakeProgress, cpuPlugin, targetCpuUsage,
+ targetCpuUsage / unitCpuUsage, "[op, 100% progress]");
+ }
+
+ // test whether the CPU usage emulator achieves the desired target using
+ // desired calls to the underling core engine.
+ private static void testEmulationAccuracy(Configuration conf,
+ FakeCpuUsageEmulatorCore fakeCore,
+ ResourceCalculatorPlugin monitor,
+ ResourceUsageMetrics metrics,
+ CumulativeCpuUsageEmulatorPlugin cpuPlugin,
+ long expectedTotalCpuUsage, long expectedTotalNumCalls)
+ throws Exception {
+ FakeProgressive fakeProgress = new FakeProgressive();
+ fakeCore.reset();
+ cpuPlugin.initialize(conf, metrics, monitor, fakeProgress);
+ int numLoops = 0;
+ while (fakeProgress.getProgress() < 1) {
+ ++numLoops;
+ float progress = (float)numLoops / 100;
+ fakeProgress.setProgress(progress);
+ cpuPlugin.emulate();
+ }
+
+ // test if the resource plugin shows the expected invocations
+ assertEquals("Cumulative cpu usage emulator plugin failed (num calls)!",
+ expectedTotalNumCalls, fakeCore.getNumCalls(), 0L);
+ // test if the resource plugin shows the expected usage
+ assertEquals("Cumulative cpu usage emulator plugin failed (total usage)!",
+ expectedTotalCpuUsage, fakeCore.getCpuUsage(), 0L);
+ }
+
+ // tests if the CPU usage emulation plugin emulates only at the expected
+ // progress gaps
+ private static void testEmulationBoundary(float progress,
+ FakeCpuUsageEmulatorCore fakeCore, FakeProgressive fakeProgress,
+ CumulativeCpuUsageEmulatorPlugin cpuPlugin, long expectedTotalCpuUsage,
+ long expectedTotalNumCalls, String info) throws Exception {
+ fakeProgress.setProgress(progress);
+ cpuPlugin.emulate();
+
+ assertEquals("Emulation interval test for cpu usage failed " + info + "!",
+ expectedTotalCpuUsage, fakeCore.getCpuUsage(), 0L);
+ assertEquals("Emulation interval test for num calls failed " + info + "!",
+ expectedTotalNumCalls, fakeCore.getNumCalls(), 0L);
+ }
+}
\ No newline at end of file
diff --git a/mapreduce/src/docs/src/documentation/content/xdocs/gridmix.xml b/mapreduce/src/docs/src/documentation/content/xdocs/gridmix.xml
index a4dc35785e6..c196147281a 100644
--- a/mapreduce/src/docs/src/documentation/content/xdocs/gridmix.xml
+++ b/mapreduce/src/docs/src/documentation/content/xdocs/gridmix.xml
@@ -663,8 +663,53 @@ hadoop jar <gridmix-jar> org.apache.hadoop.mapred.gridmix.Gridmix \
High-Ram feature emulation can be disabled by setting
gridmix.highram-emulation.enable
to
- false
. By default High-Ram feature emulation is enabled.
- Note that this feature works only for jobs of type LOADJOB.
+ false
.
+
+
+
+
+ Emulating resource usages
+ Usages of resources like CPU, physical memory, virtual memory, JVM heap
+ etc are recorded by MapReduce using its task counters. This information
+ is used by GridMix to emulate the resource usages in the simulated
+ tasks. Emulating resource usages will help GridMix exert similar load
+ on the test cluster as seen in the actual cluster.
+
+ MapReduce tasks use up resources during its entire lifetime. GridMix
+ also tries to mimic this behavior by spanning resource usage emulation
+ across the entire lifetime of the simulated task. Each resource to be
+ emulated should have an emulator associated with it.
+ Each such emulator should implement the
+ org.apache.hadoop.mapred.gridmix.emulators.resourceusage
+ .ResourceUsageEmulatorPlugin
interface. Resource
+ emulators in GridMix are plugins that can be
+ configured (plugged in or out) before every run. GridMix users can
+ configure multiple emulator plugins by passing a comma
+ separated list of emulators as a value for the
+ gridmix.emulators.resource-usage.plugins
parameter.
+
+ List of emulators shipped with GridMix:
+
+
+ - Cumulative CPU usage emulator:
+ GridMix uses the cumulative CPU usage value published by Rumen
+ and makes sure that the total cumulative CPU usage of the simulated
+ task is close to the value published by Rumen. GridMix can be
+ configured to emulate cumulative CPU usage by adding
+
org.apache.hadoop.mapred.gridmix.emulators.resourceusage
+ .CumulativeCpuUsageEmulatorPlugin
to the list of emulator
+ plugins configured for the
+ gridmix.emulators.resource-usage.plugins
parameter.
+ CPU usage emulator is designed in such a way that
+ it only emulates at specific progress boundaries of the task. This
+ interval can be configured using
+ gridmix.emulators.resource-usage.cpu.frequency
. The
+ default value for this parameter is 0.1
i.e
+ 10%
.
+
+
+ Note that GridMix will emulate resource usages only for jobs of type
+ LOADJOB.
@@ -677,10 +722,6 @@ hadoop jar <gridmix-jar> org.apache.hadoop.mapred.gridmix.Gridmix \
the following characteristics of job load are not currently captured in
job traces and cannot be accurately reproduced in GridMix:
- - CPU Usage - We have no data for per-task CPU usage, so we
- cannot even attempt an approximation. GridMix tasks are never
- CPU-bound independent of I/O, though this surely happens in
- practice.
- Filesystem Properties - No attempt is made to match block
sizes, namespace hierarchies, or any property of input, intermediate
or output data other than the bytes/records consumed and emitted from