diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java index 119d6a7393a..c1ae0ab9698 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java @@ -110,7 +110,11 @@ abstract public class Task implements Writable, Configurable { CPU_MILLISECONDS, PHYSICAL_MEMORY_BYTES, VIRTUAL_MEMORY_BYTES, - COMMITTED_HEAP_BYTES + COMMITTED_HEAP_BYTES, + MAP_PHYSICAL_MEMORY_BYTES_MAX, + MAP_VIRTUAL_MEMORY_BYTES_MAX, + REDUCE_PHYSICAL_MEMORY_BYTES_MAX, + REDUCE_VIRTUAL_MEMORY_BYTES_MAX } /** @@ -964,6 +968,24 @@ abstract public class Task implements Writable, Configurable { if (vMem != ResourceCalculatorProcessTree.UNAVAILABLE) { counters.findCounter(TaskCounter.VIRTUAL_MEMORY_BYTES).setValue(vMem); } + + if (pMem != ResourceCalculatorProcessTree.UNAVAILABLE) { + TaskCounter counter = isMapTask() ? + TaskCounter.MAP_PHYSICAL_MEMORY_BYTES_MAX : + TaskCounter.REDUCE_PHYSICAL_MEMORY_BYTES_MAX; + Counters.Counter pMemCounter = + counters.findCounter(counter); + pMemCounter.setValue(Math.max(pMemCounter.getValue(), pMem)); + } + + if (vMem != ResourceCalculatorProcessTree.UNAVAILABLE) { + TaskCounter counter = isMapTask() ? + TaskCounter.MAP_VIRTUAL_MEMORY_BYTES_MAX : + TaskCounter.REDUCE_VIRTUAL_MEMORY_BYTES_MAX; + Counters.Counter vMemCounter = + counters.findCounter(counter); + vMemCounter.setValue(Math.max(vMemCounter.getValue(), vMem)); + } } /** diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/TaskCounter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/TaskCounter.java index 42ef067aceb..0fab96c4559 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/TaskCounter.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/TaskCounter.java @@ -25,7 +25,7 @@ import org.apache.hadoop.classification.InterfaceStability; @InterfaceAudience.Public @InterfaceStability.Evolving public enum TaskCounter { - MAP_INPUT_RECORDS, + MAP_INPUT_RECORDS, MAP_OUTPUT_RECORDS, MAP_SKIPPED_RECORDS, MAP_OUTPUT_BYTES, @@ -47,5 +47,9 @@ public enum TaskCounter { CPU_MILLISECONDS, PHYSICAL_MEMORY_BYTES, VIRTUAL_MEMORY_BYTES, - COMMITTED_HEAP_BYTES + COMMITTED_HEAP_BYTES, + MAP_PHYSICAL_MEMORY_BYTES_MAX, + MAP_VIRTUAL_MEMORY_BYTES_MAX, + REDUCE_PHYSICAL_MEMORY_BYTES_MAX, + REDUCE_VIRTUAL_MEMORY_BYTES_MAX; } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/counters/FrameworkCounterGroup.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/counters/FrameworkCounterGroup.java index e78fe2e6375..b51f52882b2 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/counters/FrameworkCounterGroup.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/counters/FrameworkCounterGroup.java @@ -100,7 +100,11 @@ public abstract class FrameworkCounterGroup, @Override public void increment(long incr) { - value += incr; + if (key.name().endsWith("_MAX")) { + value = value > incr ? value : incr; + } else { + value += incr; + } } @Override diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/org/apache/hadoop/mapreduce/TaskCounter.properties b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/org/apache/hadoop/mapreduce/TaskCounter.properties index d54b980b241..0fd1028a29e 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/org/apache/hadoop/mapreduce/TaskCounter.properties +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/org/apache/hadoop/mapreduce/TaskCounter.properties @@ -37,3 +37,7 @@ COMMITTED_HEAP_BYTES.name= Total committed heap usage (bytes) CPU_MILLISECONDS.name= CPU time spent (ms) PHYSICAL_MEMORY_BYTES.name= Physical memory (bytes) snapshot VIRTUAL_MEMORY_BYTES.name= Virtual memory (bytes) snapshot +MAP_PHYSICAL_MEMORY_BYTES_MAX.name= Peak Map Physical memory (bytes) +MAP_VIRTUAL_MEMORY_BYTES_MAX.name= Peak Map Virtual memory (bytes) +REDUCE_PHYSICAL_MEMORY_BYTES_MAX.name=Peak Reduce Physical memory (bytes) +REDUCE_VIRTUAL_MEMORY_BYTES_MAX.name= Peak Reduce Virtual memory (bytes) diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestCounters.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestCounters.java index 5e2763e0036..010f0f31ea6 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestCounters.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestCounters.java @@ -348,7 +348,36 @@ public class TestCounters { counterGroup.findCounter("Unknown"); Assert.assertNull(count2); } - + + @SuppressWarnings("rawtypes") + @Test + public void testTaskCounter() { + GroupFactory groupFactory = new GroupFactoryForTest(); + FrameworkGroupFactory frameworkGroupFactory = + groupFactory.newFrameworkGroupFactory(TaskCounter.class); + Group group = (Group) frameworkGroupFactory.newGroup("TaskCounter"); + + FrameworkCounterGroup counterGroup = + (FrameworkCounterGroup) group.getUnderlyingGroup(); + + org.apache.hadoop.mapreduce.Counter count1 = + counterGroup.findCounter( + TaskCounter.PHYSICAL_MEMORY_BYTES.toString()); + Assert.assertNotNull(count1); + count1.increment(10); + count1.increment(10); + Assert.assertEquals(20, count1.getValue()); + + // Verify no exception get thrown when finding an unknown counter + org.apache.hadoop.mapreduce.Counter count2 = + counterGroup.findCounter( + TaskCounter.MAP_PHYSICAL_MEMORY_BYTES_MAX.toString()); + Assert.assertNotNull(count2); + count2.increment(5); + count2.increment(10); + Assert.assertEquals(10, count2.getValue()); + } + @Test public void testFilesystemCounter() { GroupFactory groupFactory = new GroupFactoryForTest(); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestJobCounters.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestJobCounters.java index 8b692cafc5e..850c00afdf3 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestJobCounters.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestJobCounters.java @@ -40,10 +40,12 @@ import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.MRConfig; import org.apache.hadoop.mapreduce.TaskCounter; import org.apache.hadoop.mapreduce.TaskType; import org.apache.hadoop.mapreduce.lib.input.FileInputFormatCounter; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormatCounter; +import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -751,4 +753,190 @@ public class TestJobCounters { } } + /** + * Test mapper. + */ + public static class TokenizerMapper extends + org.apache.hadoop.mapreduce.Mapper { + + private final static IntWritable ONE = new IntWritable(1); + private Text word = new Text(); + + public void map(Object key, Text value, Context context) + throws IOException, InterruptedException { + StringTokenizer itr = new StringTokenizer(value.toString()); + while (itr.hasMoreTokens()) { + word.set(itr.nextToken()); + context.write(word, ONE); + } + } + } + + /** + * Test reducer. + */ + public static class IntSumReducer extends + org.apache.hadoop.mapreduce.Reducer{ + /** + * Test customer counter. + */ + public enum Counters { MY_COUNTER_MAX } + private IntWritable result = new IntWritable(); + + public void reduce(Text key, Iterable values, Context context) + throws IOException, InterruptedException { + int sum = 0; + for (IntWritable val : values) { + sum += val.get(); + } + result.set(sum); + context.write(key, result); + context.getCounter(Counters.MY_COUNTER_MAX).increment(100); + } + } + + /** + * Mock resource reporting. + */ + public static class MockResourceCalculatorProcessTree + extends ResourceCalculatorProcessTree { + + public MockResourceCalculatorProcessTree(String root) { + super(root); + } + + @Override + public void updateProcessTree() { + } + + @Override + public String getProcessTreeDump() { + return ""; + } + + @Override + public long getCumulativeCpuTime() { + return 0; + } + + @Override + public boolean checkPidPgrpidForMatch() { + return true; + } + + @Override + public long getRssMemorySize() { + return 1024; + } + + @Override + public long getVirtualMemorySize() { + return 2000; + } + + @Override + public float getCpuUsagePercent() { + return 0; + } + } + + @Test + public void testMockResourceCalculatorProcessTree() { + ResourceCalculatorProcessTree tree; + tree = ResourceCalculatorProcessTree.getResourceCalculatorProcessTree( + "1", TestJobCounters.MockResourceCalculatorProcessTree.class, + new Configuration()); + assertNotNull(tree); + } + + /** + * End to end test of maximum counters. + * @throws IOException test failed + * @throws ClassNotFoundException test failed + * @throws InterruptedException test failed + */ + @Test + public void testMaxCounter() + throws IOException, ClassNotFoundException, InterruptedException { + // Create mapreduce cluster + MiniMRClientCluster mrCluster = MiniMRClientClusterFactory.create( + this.getClass(), 2, new Configuration()); + + try { + // Setup input and output paths + Path rootDir = + new Path(System.getProperty("test.build.data", "/tmp")); + Path testRootDir = new Path(rootDir, "testMaxCounter"); + Path testInputDir = new Path(testRootDir, "input"); + Path testOutputDir = new Path(testRootDir, "output"); + FileSystem fs = FileSystem.getLocal(new Configuration()); + fs.mkdirs(testInputDir); + Path testInputFile = new Path(testInputDir, "file01"); + FSDataOutputStream stream = + fs.create(testInputFile); + stream.writeChars("foo"); + stream.writeChars("bar"); + stream.close(); + fs.delete(testOutputDir, true); + + // Run job (1 mapper, 2 reducers) + Configuration conf = new Configuration(); + conf.setClass(MRConfig.RESOURCE_CALCULATOR_PROCESS_TREE, + MockResourceCalculatorProcessTree.class, + ResourceCalculatorProcessTree.class); + Job job = Job.getInstance(conf, "word count"); + job.setJarByClass(WordCount.class); + job.setMapperClass(TokenizerMapper.class); + job.setCombinerClass(IntSumReducer.class); + job.setReducerClass(IntSumReducer.class); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(IntWritable.class); + job.setNumReduceTasks(2); // make sure we have double here to test max + org.apache.hadoop.mapreduce.lib.input.FileInputFormat + .addInputPath(job, testInputDir); + org.apache.hadoop.mapreduce.lib.output.FileOutputFormat + .setOutputPath(job, testOutputDir); + assertTrue(job.waitForCompletion(true)); + + // Verify physical numbers + org.apache.hadoop.mapreduce.Counter maxMap = + job.getCounters().findCounter( + TaskCounter.MAP_PHYSICAL_MEMORY_BYTES_MAX); + org.apache.hadoop.mapreduce.Counter maxReduce = + job.getCounters().findCounter( + TaskCounter.REDUCE_PHYSICAL_MEMORY_BYTES_MAX); + org.apache.hadoop.mapreduce.Counter allP = + job.getCounters().findCounter( + TaskCounter.PHYSICAL_MEMORY_BYTES); + assertEquals(1024, maxMap.getValue()); + assertEquals(1024, maxReduce.getValue()); + assertEquals(3072, allP.getValue()); + + // Verify virtual numbers + org.apache.hadoop.mapreduce.Counter maxMapV = + job.getCounters().findCounter( + TaskCounter.MAP_VIRTUAL_MEMORY_BYTES_MAX); + org.apache.hadoop.mapreduce.Counter maxReduceV = + job.getCounters().findCounter( + TaskCounter.REDUCE_VIRTUAL_MEMORY_BYTES_MAX); + org.apache.hadoop.mapreduce.Counter allV = + job.getCounters().findCounter( + TaskCounter.VIRTUAL_MEMORY_BYTES); + assertEquals(2000, maxMapV.getValue()); + assertEquals(2000, maxReduceV.getValue()); + assertEquals(6000, allV.getValue()); + + // Make sure customer counters are not affected by the _MAX + // code in FrameworkCountersGroup + org.apache.hadoop.mapreduce.Counter customerCounter = + job.getCounters().findCounter( + IntSumReducer.Counters.MY_COUNTER_MAX); + assertEquals(200, customerCounter.getValue()); + + fs.delete(testInputDir, true); + fs.delete(testOutputDir, true); + } finally { + mrCluster.stop(); + } + } }