MAPREDUCE-6829. Add peak memory usage counter for each task. (Miklos Szegedi via kasha)

(cherry picked from commit c65f884fc7)
This commit is contained in:
Karthik Kambatla 2017-01-26 11:08:13 -08:00
parent 827cd30923
commit a91cc63d37
6 changed files with 256 additions and 5 deletions

View File

@ -111,7 +111,11 @@ abstract public class Task implements Writable, Configurable {
CPU_MILLISECONDS,
PHYSICAL_MEMORY_BYTES,
VIRTUAL_MEMORY_BYTES,
COMMITTED_HEAP_BYTES
COMMITTED_HEAP_BYTES,
MAP_PHYSICAL_MEMORY_BYTES_MAX,
MAP_VIRTUAL_MEMORY_BYTES_MAX,
REDUCE_PHYSICAL_MEMORY_BYTES_MAX,
REDUCE_VIRTUAL_MEMORY_BYTES_MAX
}
/**
@ -949,6 +953,24 @@ abstract public class Task implements Writable, Configurable {
if (vMem != ResourceCalculatorProcessTree.UNAVAILABLE) {
counters.findCounter(TaskCounter.VIRTUAL_MEMORY_BYTES).setValue(vMem);
}
if (pMem != ResourceCalculatorProcessTree.UNAVAILABLE) {
TaskCounter counter = isMapTask() ?
TaskCounter.MAP_PHYSICAL_MEMORY_BYTES_MAX :
TaskCounter.REDUCE_PHYSICAL_MEMORY_BYTES_MAX;
Counters.Counter pMemCounter =
counters.findCounter(counter);
pMemCounter.setValue(Math.max(pMemCounter.getValue(), pMem));
}
if (vMem != ResourceCalculatorProcessTree.UNAVAILABLE) {
TaskCounter counter = isMapTask() ?
TaskCounter.MAP_VIRTUAL_MEMORY_BYTES_MAX :
TaskCounter.REDUCE_VIRTUAL_MEMORY_BYTES_MAX;
Counters.Counter vMemCounter =
counters.findCounter(counter);
vMemCounter.setValue(Math.max(vMemCounter.getValue(), vMem));
}
}
/**

View File

@ -25,7 +25,7 @@ import org.apache.hadoop.classification.InterfaceStability;
@InterfaceAudience.Public
@InterfaceStability.Evolving
public enum TaskCounter {
MAP_INPUT_RECORDS,
MAP_INPUT_RECORDS,
MAP_OUTPUT_RECORDS,
MAP_SKIPPED_RECORDS,
MAP_OUTPUT_BYTES,
@ -47,5 +47,9 @@ public enum TaskCounter {
CPU_MILLISECONDS,
PHYSICAL_MEMORY_BYTES,
VIRTUAL_MEMORY_BYTES,
COMMITTED_HEAP_BYTES
COMMITTED_HEAP_BYTES,
MAP_PHYSICAL_MEMORY_BYTES_MAX,
MAP_VIRTUAL_MEMORY_BYTES_MAX,
REDUCE_PHYSICAL_MEMORY_BYTES_MAX,
REDUCE_VIRTUAL_MEMORY_BYTES_MAX;
}

View File

@ -100,7 +100,11 @@ public abstract class FrameworkCounterGroup<T extends Enum<T>,
@Override
public void increment(long incr) {
value += incr;
if (key.name().endsWith("_MAX")) {
value = value > incr ? value : incr;
} else {
value += incr;
}
}
@Override

View File

@ -37,3 +37,7 @@ COMMITTED_HEAP_BYTES.name= Total committed heap usage (bytes)
CPU_MILLISECONDS.name= CPU time spent (ms)
PHYSICAL_MEMORY_BYTES.name= Physical memory (bytes) snapshot
VIRTUAL_MEMORY_BYTES.name= Virtual memory (bytes) snapshot
MAP_PHYSICAL_MEMORY_BYTES_MAX.name= Peak Map Physical memory (bytes)
MAP_VIRTUAL_MEMORY_BYTES_MAX.name= Peak Map Virtual memory (bytes)
REDUCE_PHYSICAL_MEMORY_BYTES_MAX.name=Peak Reduce Physical memory (bytes)
REDUCE_VIRTUAL_MEMORY_BYTES_MAX.name= Peak Reduce Virtual memory (bytes)

View File

@ -348,7 +348,36 @@ public class TestCounters {
counterGroup.findCounter("Unknown");
Assert.assertNull(count2);
}
@SuppressWarnings("rawtypes")
@Test
public void testTaskCounter() {
GroupFactory groupFactory = new GroupFactoryForTest();
FrameworkGroupFactory frameworkGroupFactory =
groupFactory.newFrameworkGroupFactory(TaskCounter.class);
Group group = (Group) frameworkGroupFactory.newGroup("TaskCounter");
FrameworkCounterGroup counterGroup =
(FrameworkCounterGroup) group.getUnderlyingGroup();
org.apache.hadoop.mapreduce.Counter count1 =
counterGroup.findCounter(
TaskCounter.PHYSICAL_MEMORY_BYTES.toString());
Assert.assertNotNull(count1);
count1.increment(10);
count1.increment(10);
Assert.assertEquals(20, count1.getValue());
// Verify no exception get thrown when finding an unknown counter
org.apache.hadoop.mapreduce.Counter count2 =
counterGroup.findCounter(
TaskCounter.MAP_PHYSICAL_MEMORY_BYTES_MAX.toString());
Assert.assertNotNull(count2);
count2.increment(5);
count2.increment(10);
Assert.assertEquals(10, count2.getValue());
}
@Test
public void testFilesystemCounter() {
GroupFactory groupFactory = new GroupFactoryForTest();

View File

@ -40,10 +40,12 @@ import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.TaskCounter;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormatCounter;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormatCounter;
import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@ -751,4 +753,190 @@ public class TestJobCounters {
}
}
/**
* Test mapper.
*/
public static class TokenizerMapper extends
org.apache.hadoop.mapreduce.Mapper<Object, Text, Text, IntWritable> {
private final static IntWritable ONE = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, ONE);
}
}
}
/**
* Test reducer.
*/
public static class IntSumReducer extends
org.apache.hadoop.mapreduce.Reducer<Text, IntWritable, Text, IntWritable>{
/**
* Test customer counter.
*/
public enum Counters { MY_COUNTER_MAX }
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
context.getCounter(Counters.MY_COUNTER_MAX).increment(100);
}
}
/**
* Mock resource reporting.
*/
public static class MockResourceCalculatorProcessTree
extends ResourceCalculatorProcessTree {
public MockResourceCalculatorProcessTree(String root) {
super(root);
}
@Override
public void updateProcessTree() {
}
@Override
public String getProcessTreeDump() {
return "";
}
@Override
public long getCumulativeCpuTime() {
return 0;
}
@Override
public boolean checkPidPgrpidForMatch() {
return true;
}
@Override
public long getRssMemorySize() {
return 1024;
}
@Override
public long getVirtualMemorySize() {
return 2000;
}
@Override
public float getCpuUsagePercent() {
return 0;
}
}
@Test
public void testMockResourceCalculatorProcessTree() {
ResourceCalculatorProcessTree tree;
tree = ResourceCalculatorProcessTree.getResourceCalculatorProcessTree(
"1", TestJobCounters.MockResourceCalculatorProcessTree.class,
new Configuration());
assertNotNull(tree);
}
/**
* End to end test of maximum counters.
* @throws IOException test failed
* @throws ClassNotFoundException test failed
* @throws InterruptedException test failed
*/
@Test
public void testMaxCounter()
throws IOException, ClassNotFoundException, InterruptedException {
// Create mapreduce cluster
MiniMRClientCluster mrCluster = MiniMRClientClusterFactory.create(
this.getClass(), 2, new Configuration());
try {
// Setup input and output paths
Path rootDir =
new Path(System.getProperty("test.build.data", "/tmp"));
Path testRootDir = new Path(rootDir, "testMaxCounter");
Path testInputDir = new Path(testRootDir, "input");
Path testOutputDir = new Path(testRootDir, "output");
FileSystem fs = FileSystem.getLocal(new Configuration());
fs.mkdirs(testInputDir);
Path testInputFile = new Path(testInputDir, "file01");
FSDataOutputStream stream =
fs.create(testInputFile);
stream.writeChars("foo");
stream.writeChars("bar");
stream.close();
fs.delete(testOutputDir, true);
// Run job (1 mapper, 2 reducers)
Configuration conf = new Configuration();
conf.setClass(MRConfig.RESOURCE_CALCULATOR_PROCESS_TREE,
MockResourceCalculatorProcessTree.class,
ResourceCalculatorProcessTree.class);
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setNumReduceTasks(2); // make sure we have double here to test max
org.apache.hadoop.mapreduce.lib.input.FileInputFormat
.addInputPath(job, testInputDir);
org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
.setOutputPath(job, testOutputDir);
assertTrue(job.waitForCompletion(true));
// Verify physical numbers
org.apache.hadoop.mapreduce.Counter maxMap =
job.getCounters().findCounter(
TaskCounter.MAP_PHYSICAL_MEMORY_BYTES_MAX);
org.apache.hadoop.mapreduce.Counter maxReduce =
job.getCounters().findCounter(
TaskCounter.REDUCE_PHYSICAL_MEMORY_BYTES_MAX);
org.apache.hadoop.mapreduce.Counter allP =
job.getCounters().findCounter(
TaskCounter.PHYSICAL_MEMORY_BYTES);
assertEquals(1024, maxMap.getValue());
assertEquals(1024, maxReduce.getValue());
assertEquals(3072, allP.getValue());
// Verify virtual numbers
org.apache.hadoop.mapreduce.Counter maxMapV =
job.getCounters().findCounter(
TaskCounter.MAP_VIRTUAL_MEMORY_BYTES_MAX);
org.apache.hadoop.mapreduce.Counter maxReduceV =
job.getCounters().findCounter(
TaskCounter.REDUCE_VIRTUAL_MEMORY_BYTES_MAX);
org.apache.hadoop.mapreduce.Counter allV =
job.getCounters().findCounter(
TaskCounter.VIRTUAL_MEMORY_BYTES);
assertEquals(2000, maxMapV.getValue());
assertEquals(2000, maxReduceV.getValue());
assertEquals(6000, allV.getValue());
// Make sure customer counters are not affected by the _MAX
// code in FrameworkCountersGroup
org.apache.hadoop.mapreduce.Counter customerCounter =
job.getCounters().findCounter(
IntSumReducer.Counters.MY_COUNTER_MAX);
assertEquals(200, customerCounter.getValue());
fs.delete(testInputDir, true);
fs.delete(testOutputDir, true);
} finally {
mrCluster.stop();
}
}
}