MAPREDUCE-6829. Add peak memory usage counter for each task. (Miklos Szegedi via kasha)
This commit is contained in:
parent
44606aa850
commit
c65f884fc7
|
@ -110,7 +110,11 @@ abstract public class Task implements Writable, Configurable {
|
|||
CPU_MILLISECONDS,
|
||||
PHYSICAL_MEMORY_BYTES,
|
||||
VIRTUAL_MEMORY_BYTES,
|
||||
COMMITTED_HEAP_BYTES
|
||||
COMMITTED_HEAP_BYTES,
|
||||
MAP_PHYSICAL_MEMORY_BYTES_MAX,
|
||||
MAP_VIRTUAL_MEMORY_BYTES_MAX,
|
||||
REDUCE_PHYSICAL_MEMORY_BYTES_MAX,
|
||||
REDUCE_VIRTUAL_MEMORY_BYTES_MAX
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -964,6 +968,24 @@ abstract public class Task implements Writable, Configurable {
|
|||
if (vMem != ResourceCalculatorProcessTree.UNAVAILABLE) {
|
||||
counters.findCounter(TaskCounter.VIRTUAL_MEMORY_BYTES).setValue(vMem);
|
||||
}
|
||||
|
||||
if (pMem != ResourceCalculatorProcessTree.UNAVAILABLE) {
|
||||
TaskCounter counter = isMapTask() ?
|
||||
TaskCounter.MAP_PHYSICAL_MEMORY_BYTES_MAX :
|
||||
TaskCounter.REDUCE_PHYSICAL_MEMORY_BYTES_MAX;
|
||||
Counters.Counter pMemCounter =
|
||||
counters.findCounter(counter);
|
||||
pMemCounter.setValue(Math.max(pMemCounter.getValue(), pMem));
|
||||
}
|
||||
|
||||
if (vMem != ResourceCalculatorProcessTree.UNAVAILABLE) {
|
||||
TaskCounter counter = isMapTask() ?
|
||||
TaskCounter.MAP_VIRTUAL_MEMORY_BYTES_MAX :
|
||||
TaskCounter.REDUCE_VIRTUAL_MEMORY_BYTES_MAX;
|
||||
Counters.Counter vMemCounter =
|
||||
counters.findCounter(counter);
|
||||
vMemCounter.setValue(Math.max(vMemCounter.getValue(), vMem));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -47,5 +47,9 @@ public enum TaskCounter {
|
|||
CPU_MILLISECONDS,
|
||||
PHYSICAL_MEMORY_BYTES,
|
||||
VIRTUAL_MEMORY_BYTES,
|
||||
COMMITTED_HEAP_BYTES
|
||||
COMMITTED_HEAP_BYTES,
|
||||
MAP_PHYSICAL_MEMORY_BYTES_MAX,
|
||||
MAP_VIRTUAL_MEMORY_BYTES_MAX,
|
||||
REDUCE_PHYSICAL_MEMORY_BYTES_MAX,
|
||||
REDUCE_VIRTUAL_MEMORY_BYTES_MAX;
|
||||
}
|
||||
|
|
|
@ -100,8 +100,12 @@ public abstract class FrameworkCounterGroup<T extends Enum<T>,
|
|||
|
||||
@Override
|
||||
public void increment(long incr) {
|
||||
if (key.name().endsWith("_MAX")) {
|
||||
value = value > incr ? value : incr;
|
||||
} else {
|
||||
value += incr;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(DataOutput out) throws IOException {
|
||||
|
|
|
@ -37,3 +37,7 @@ COMMITTED_HEAP_BYTES.name= Total committed heap usage (bytes)
|
|||
CPU_MILLISECONDS.name= CPU time spent (ms)
|
||||
PHYSICAL_MEMORY_BYTES.name= Physical memory (bytes) snapshot
|
||||
VIRTUAL_MEMORY_BYTES.name= Virtual memory (bytes) snapshot
|
||||
MAP_PHYSICAL_MEMORY_BYTES_MAX.name= Peak Map Physical memory (bytes)
|
||||
MAP_VIRTUAL_MEMORY_BYTES_MAX.name= Peak Map Virtual memory (bytes)
|
||||
REDUCE_PHYSICAL_MEMORY_BYTES_MAX.name=Peak Reduce Physical memory (bytes)
|
||||
REDUCE_VIRTUAL_MEMORY_BYTES_MAX.name= Peak Reduce Virtual memory (bytes)
|
||||
|
|
|
@ -349,6 +349,35 @@ public class TestCounters {
|
|||
Assert.assertNull(count2);
|
||||
}
|
||||
|
||||
@SuppressWarnings("rawtypes")
|
||||
@Test
|
||||
public void testTaskCounter() {
|
||||
GroupFactory groupFactory = new GroupFactoryForTest();
|
||||
FrameworkGroupFactory frameworkGroupFactory =
|
||||
groupFactory.newFrameworkGroupFactory(TaskCounter.class);
|
||||
Group group = (Group) frameworkGroupFactory.newGroup("TaskCounter");
|
||||
|
||||
FrameworkCounterGroup counterGroup =
|
||||
(FrameworkCounterGroup) group.getUnderlyingGroup();
|
||||
|
||||
org.apache.hadoop.mapreduce.Counter count1 =
|
||||
counterGroup.findCounter(
|
||||
TaskCounter.PHYSICAL_MEMORY_BYTES.toString());
|
||||
Assert.assertNotNull(count1);
|
||||
count1.increment(10);
|
||||
count1.increment(10);
|
||||
Assert.assertEquals(20, count1.getValue());
|
||||
|
||||
// Verify no exception get thrown when finding an unknown counter
|
||||
org.apache.hadoop.mapreduce.Counter count2 =
|
||||
counterGroup.findCounter(
|
||||
TaskCounter.MAP_PHYSICAL_MEMORY_BYTES_MAX.toString());
|
||||
Assert.assertNotNull(count2);
|
||||
count2.increment(5);
|
||||
count2.increment(10);
|
||||
Assert.assertEquals(10, count2.getValue());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFilesystemCounter() {
|
||||
GroupFactory groupFactory = new GroupFactoryForTest();
|
||||
|
|
|
@ -40,10 +40,12 @@ import org.apache.hadoop.io.Text;
|
|||
import org.apache.hadoop.io.Writable;
|
||||
import org.apache.hadoop.io.WritableComparable;
|
||||
import org.apache.hadoop.mapreduce.Job;
|
||||
import org.apache.hadoop.mapreduce.MRConfig;
|
||||
import org.apache.hadoop.mapreduce.TaskCounter;
|
||||
import org.apache.hadoop.mapreduce.TaskType;
|
||||
import org.apache.hadoop.mapreduce.lib.input.FileInputFormatCounter;
|
||||
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormatCounter;
|
||||
import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
@ -751,4 +753,190 @@ public class TestJobCounters {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test mapper.
|
||||
*/
|
||||
public static class TokenizerMapper extends
|
||||
org.apache.hadoop.mapreduce.Mapper<Object, Text, Text, IntWritable> {
|
||||
|
||||
private final static IntWritable ONE = new IntWritable(1);
|
||||
private Text word = new Text();
|
||||
|
||||
public void map(Object key, Text value, Context context)
|
||||
throws IOException, InterruptedException {
|
||||
StringTokenizer itr = new StringTokenizer(value.toString());
|
||||
while (itr.hasMoreTokens()) {
|
||||
word.set(itr.nextToken());
|
||||
context.write(word, ONE);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test reducer.
|
||||
*/
|
||||
public static class IntSumReducer extends
|
||||
org.apache.hadoop.mapreduce.Reducer<Text, IntWritable, Text, IntWritable>{
|
||||
/**
|
||||
* Test customer counter.
|
||||
*/
|
||||
public enum Counters { MY_COUNTER_MAX }
|
||||
private IntWritable result = new IntWritable();
|
||||
|
||||
public void reduce(Text key, Iterable<IntWritable> values, Context context)
|
||||
throws IOException, InterruptedException {
|
||||
int sum = 0;
|
||||
for (IntWritable val : values) {
|
||||
sum += val.get();
|
||||
}
|
||||
result.set(sum);
|
||||
context.write(key, result);
|
||||
context.getCounter(Counters.MY_COUNTER_MAX).increment(100);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Mock resource reporting.
|
||||
*/
|
||||
public static class MockResourceCalculatorProcessTree
|
||||
extends ResourceCalculatorProcessTree {
|
||||
|
||||
public MockResourceCalculatorProcessTree(String root) {
|
||||
super(root);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void updateProcessTree() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getProcessTreeDump() {
|
||||
return "";
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getCumulativeCpuTime() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean checkPidPgrpidForMatch() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getRssMemorySize() {
|
||||
return 1024;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getVirtualMemorySize() {
|
||||
return 2000;
|
||||
}
|
||||
|
||||
@Override
|
||||
public float getCpuUsagePercent() {
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMockResourceCalculatorProcessTree() {
|
||||
ResourceCalculatorProcessTree tree;
|
||||
tree = ResourceCalculatorProcessTree.getResourceCalculatorProcessTree(
|
||||
"1", TestJobCounters.MockResourceCalculatorProcessTree.class,
|
||||
new Configuration());
|
||||
assertNotNull(tree);
|
||||
}
|
||||
|
||||
/**
|
||||
* End to end test of maximum counters.
|
||||
* @throws IOException test failed
|
||||
* @throws ClassNotFoundException test failed
|
||||
* @throws InterruptedException test failed
|
||||
*/
|
||||
@Test
|
||||
public void testMaxCounter()
|
||||
throws IOException, ClassNotFoundException, InterruptedException {
|
||||
// Create mapreduce cluster
|
||||
MiniMRClientCluster mrCluster = MiniMRClientClusterFactory.create(
|
||||
this.getClass(), 2, new Configuration());
|
||||
|
||||
try {
|
||||
// Setup input and output paths
|
||||
Path rootDir =
|
||||
new Path(System.getProperty("test.build.data", "/tmp"));
|
||||
Path testRootDir = new Path(rootDir, "testMaxCounter");
|
||||
Path testInputDir = new Path(testRootDir, "input");
|
||||
Path testOutputDir = new Path(testRootDir, "output");
|
||||
FileSystem fs = FileSystem.getLocal(new Configuration());
|
||||
fs.mkdirs(testInputDir);
|
||||
Path testInputFile = new Path(testInputDir, "file01");
|
||||
FSDataOutputStream stream =
|
||||
fs.create(testInputFile);
|
||||
stream.writeChars("foo");
|
||||
stream.writeChars("bar");
|
||||
stream.close();
|
||||
fs.delete(testOutputDir, true);
|
||||
|
||||
// Run job (1 mapper, 2 reducers)
|
||||
Configuration conf = new Configuration();
|
||||
conf.setClass(MRConfig.RESOURCE_CALCULATOR_PROCESS_TREE,
|
||||
MockResourceCalculatorProcessTree.class,
|
||||
ResourceCalculatorProcessTree.class);
|
||||
Job job = Job.getInstance(conf, "word count");
|
||||
job.setJarByClass(WordCount.class);
|
||||
job.setMapperClass(TokenizerMapper.class);
|
||||
job.setCombinerClass(IntSumReducer.class);
|
||||
job.setReducerClass(IntSumReducer.class);
|
||||
job.setOutputKeyClass(Text.class);
|
||||
job.setOutputValueClass(IntWritable.class);
|
||||
job.setNumReduceTasks(2); // make sure we have double here to test max
|
||||
org.apache.hadoop.mapreduce.lib.input.FileInputFormat
|
||||
.addInputPath(job, testInputDir);
|
||||
org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
|
||||
.setOutputPath(job, testOutputDir);
|
||||
assertTrue(job.waitForCompletion(true));
|
||||
|
||||
// Verify physical numbers
|
||||
org.apache.hadoop.mapreduce.Counter maxMap =
|
||||
job.getCounters().findCounter(
|
||||
TaskCounter.MAP_PHYSICAL_MEMORY_BYTES_MAX);
|
||||
org.apache.hadoop.mapreduce.Counter maxReduce =
|
||||
job.getCounters().findCounter(
|
||||
TaskCounter.REDUCE_PHYSICAL_MEMORY_BYTES_MAX);
|
||||
org.apache.hadoop.mapreduce.Counter allP =
|
||||
job.getCounters().findCounter(
|
||||
TaskCounter.PHYSICAL_MEMORY_BYTES);
|
||||
assertEquals(1024, maxMap.getValue());
|
||||
assertEquals(1024, maxReduce.getValue());
|
||||
assertEquals(3072, allP.getValue());
|
||||
|
||||
// Verify virtual numbers
|
||||
org.apache.hadoop.mapreduce.Counter maxMapV =
|
||||
job.getCounters().findCounter(
|
||||
TaskCounter.MAP_VIRTUAL_MEMORY_BYTES_MAX);
|
||||
org.apache.hadoop.mapreduce.Counter maxReduceV =
|
||||
job.getCounters().findCounter(
|
||||
TaskCounter.REDUCE_VIRTUAL_MEMORY_BYTES_MAX);
|
||||
org.apache.hadoop.mapreduce.Counter allV =
|
||||
job.getCounters().findCounter(
|
||||
TaskCounter.VIRTUAL_MEMORY_BYTES);
|
||||
assertEquals(2000, maxMapV.getValue());
|
||||
assertEquals(2000, maxReduceV.getValue());
|
||||
assertEquals(6000, allV.getValue());
|
||||
|
||||
// Make sure customer counters are not affected by the _MAX
|
||||
// code in FrameworkCountersGroup
|
||||
org.apache.hadoop.mapreduce.Counter customerCounter =
|
||||
job.getCounters().findCounter(
|
||||
IntSumReducer.Counters.MY_COUNTER_MAX);
|
||||
assertEquals(200, customerCounter.getValue());
|
||||
|
||||
fs.delete(testInputDir, true);
|
||||
fs.delete(testOutputDir, true);
|
||||
} finally {
|
||||
mrCluster.stop();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue