Revert "MAPREDUCE-6829. Add peak memory usage counter for each task. (Miklos Szegedi via kasha)"
This reverts commit a91cc63d37
.
This commit is contained in:
parent
b9640c02bf
commit
67d31a8e7e
|
@ -111,11 +111,7 @@ abstract public class Task implements Writable, Configurable {
|
||||||
CPU_MILLISECONDS,
|
CPU_MILLISECONDS,
|
||||||
PHYSICAL_MEMORY_BYTES,
|
PHYSICAL_MEMORY_BYTES,
|
||||||
VIRTUAL_MEMORY_BYTES,
|
VIRTUAL_MEMORY_BYTES,
|
||||||
COMMITTED_HEAP_BYTES,
|
COMMITTED_HEAP_BYTES
|
||||||
MAP_PHYSICAL_MEMORY_BYTES_MAX,
|
|
||||||
MAP_VIRTUAL_MEMORY_BYTES_MAX,
|
|
||||||
REDUCE_PHYSICAL_MEMORY_BYTES_MAX,
|
|
||||||
REDUCE_VIRTUAL_MEMORY_BYTES_MAX
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -953,24 +949,6 @@ abstract public class Task implements Writable, Configurable {
|
||||||
if (vMem != ResourceCalculatorProcessTree.UNAVAILABLE) {
|
if (vMem != ResourceCalculatorProcessTree.UNAVAILABLE) {
|
||||||
counters.findCounter(TaskCounter.VIRTUAL_MEMORY_BYTES).setValue(vMem);
|
counters.findCounter(TaskCounter.VIRTUAL_MEMORY_BYTES).setValue(vMem);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pMem != ResourceCalculatorProcessTree.UNAVAILABLE) {
|
|
||||||
TaskCounter counter = isMapTask() ?
|
|
||||||
TaskCounter.MAP_PHYSICAL_MEMORY_BYTES_MAX :
|
|
||||||
TaskCounter.REDUCE_PHYSICAL_MEMORY_BYTES_MAX;
|
|
||||||
Counters.Counter pMemCounter =
|
|
||||||
counters.findCounter(counter);
|
|
||||||
pMemCounter.setValue(Math.max(pMemCounter.getValue(), pMem));
|
|
||||||
}
|
|
||||||
|
|
||||||
if (vMem != ResourceCalculatorProcessTree.UNAVAILABLE) {
|
|
||||||
TaskCounter counter = isMapTask() ?
|
|
||||||
TaskCounter.MAP_VIRTUAL_MEMORY_BYTES_MAX :
|
|
||||||
TaskCounter.REDUCE_VIRTUAL_MEMORY_BYTES_MAX;
|
|
||||||
Counters.Counter vMemCounter =
|
|
||||||
counters.findCounter(counter);
|
|
||||||
vMemCounter.setValue(Math.max(vMemCounter.getValue(), vMem));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -47,9 +47,5 @@ public enum TaskCounter {
|
||||||
CPU_MILLISECONDS,
|
CPU_MILLISECONDS,
|
||||||
PHYSICAL_MEMORY_BYTES,
|
PHYSICAL_MEMORY_BYTES,
|
||||||
VIRTUAL_MEMORY_BYTES,
|
VIRTUAL_MEMORY_BYTES,
|
||||||
COMMITTED_HEAP_BYTES,
|
COMMITTED_HEAP_BYTES
|
||||||
MAP_PHYSICAL_MEMORY_BYTES_MAX,
|
|
||||||
MAP_VIRTUAL_MEMORY_BYTES_MAX,
|
|
||||||
REDUCE_PHYSICAL_MEMORY_BYTES_MAX,
|
|
||||||
REDUCE_VIRTUAL_MEMORY_BYTES_MAX;
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -100,11 +100,7 @@ public abstract class FrameworkCounterGroup<T extends Enum<T>,
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void increment(long incr) {
|
public void increment(long incr) {
|
||||||
if (key.name().endsWith("_MAX")) {
|
value += incr;
|
||||||
value = value > incr ? value : incr;
|
|
||||||
} else {
|
|
||||||
value += incr;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -37,7 +37,3 @@ COMMITTED_HEAP_BYTES.name= Total committed heap usage (bytes)
|
||||||
CPU_MILLISECONDS.name= CPU time spent (ms)
|
CPU_MILLISECONDS.name= CPU time spent (ms)
|
||||||
PHYSICAL_MEMORY_BYTES.name= Physical memory (bytes) snapshot
|
PHYSICAL_MEMORY_BYTES.name= Physical memory (bytes) snapshot
|
||||||
VIRTUAL_MEMORY_BYTES.name= Virtual memory (bytes) snapshot
|
VIRTUAL_MEMORY_BYTES.name= Virtual memory (bytes) snapshot
|
||||||
MAP_PHYSICAL_MEMORY_BYTES_MAX.name= Peak Map Physical memory (bytes)
|
|
||||||
MAP_VIRTUAL_MEMORY_BYTES_MAX.name= Peak Map Virtual memory (bytes)
|
|
||||||
REDUCE_PHYSICAL_MEMORY_BYTES_MAX.name=Peak Reduce Physical memory (bytes)
|
|
||||||
REDUCE_VIRTUAL_MEMORY_BYTES_MAX.name= Peak Reduce Virtual memory (bytes)
|
|
||||||
|
|
|
@ -349,35 +349,6 @@ public class TestCounters {
|
||||||
Assert.assertNull(count2);
|
Assert.assertNull(count2);
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("rawtypes")
|
|
||||||
@Test
|
|
||||||
public void testTaskCounter() {
|
|
||||||
GroupFactory groupFactory = new GroupFactoryForTest();
|
|
||||||
FrameworkGroupFactory frameworkGroupFactory =
|
|
||||||
groupFactory.newFrameworkGroupFactory(TaskCounter.class);
|
|
||||||
Group group = (Group) frameworkGroupFactory.newGroup("TaskCounter");
|
|
||||||
|
|
||||||
FrameworkCounterGroup counterGroup =
|
|
||||||
(FrameworkCounterGroup) group.getUnderlyingGroup();
|
|
||||||
|
|
||||||
org.apache.hadoop.mapreduce.Counter count1 =
|
|
||||||
counterGroup.findCounter(
|
|
||||||
TaskCounter.PHYSICAL_MEMORY_BYTES.toString());
|
|
||||||
Assert.assertNotNull(count1);
|
|
||||||
count1.increment(10);
|
|
||||||
count1.increment(10);
|
|
||||||
Assert.assertEquals(20, count1.getValue());
|
|
||||||
|
|
||||||
// Verify no exception get thrown when finding an unknown counter
|
|
||||||
org.apache.hadoop.mapreduce.Counter count2 =
|
|
||||||
counterGroup.findCounter(
|
|
||||||
TaskCounter.MAP_PHYSICAL_MEMORY_BYTES_MAX.toString());
|
|
||||||
Assert.assertNotNull(count2);
|
|
||||||
count2.increment(5);
|
|
||||||
count2.increment(10);
|
|
||||||
Assert.assertEquals(10, count2.getValue());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testFilesystemCounter() {
|
public void testFilesystemCounter() {
|
||||||
GroupFactory groupFactory = new GroupFactoryForTest();
|
GroupFactory groupFactory = new GroupFactoryForTest();
|
||||||
|
|
|
@ -40,12 +40,10 @@ import org.apache.hadoop.io.Text;
|
||||||
import org.apache.hadoop.io.Writable;
|
import org.apache.hadoop.io.Writable;
|
||||||
import org.apache.hadoop.io.WritableComparable;
|
import org.apache.hadoop.io.WritableComparable;
|
||||||
import org.apache.hadoop.mapreduce.Job;
|
import org.apache.hadoop.mapreduce.Job;
|
||||||
import org.apache.hadoop.mapreduce.MRConfig;
|
|
||||||
import org.apache.hadoop.mapreduce.TaskCounter;
|
import org.apache.hadoop.mapreduce.TaskCounter;
|
||||||
import org.apache.hadoop.mapreduce.TaskType;
|
import org.apache.hadoop.mapreduce.TaskType;
|
||||||
import org.apache.hadoop.mapreduce.lib.input.FileInputFormatCounter;
|
import org.apache.hadoop.mapreduce.lib.input.FileInputFormatCounter;
|
||||||
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormatCounter;
|
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormatCounter;
|
||||||
import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree;
|
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
@ -753,190 +751,4 @@ public class TestJobCounters {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Test mapper.
|
|
||||||
*/
|
|
||||||
public static class TokenizerMapper extends
|
|
||||||
org.apache.hadoop.mapreduce.Mapper<Object, Text, Text, IntWritable> {
|
|
||||||
|
|
||||||
private final static IntWritable ONE = new IntWritable(1);
|
|
||||||
private Text word = new Text();
|
|
||||||
|
|
||||||
public void map(Object key, Text value, Context context)
|
|
||||||
throws IOException, InterruptedException {
|
|
||||||
StringTokenizer itr = new StringTokenizer(value.toString());
|
|
||||||
while (itr.hasMoreTokens()) {
|
|
||||||
word.set(itr.nextToken());
|
|
||||||
context.write(word, ONE);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Test reducer.
|
|
||||||
*/
|
|
||||||
public static class IntSumReducer extends
|
|
||||||
org.apache.hadoop.mapreduce.Reducer<Text, IntWritable, Text, IntWritable>{
|
|
||||||
/**
|
|
||||||
* Test customer counter.
|
|
||||||
*/
|
|
||||||
public enum Counters { MY_COUNTER_MAX }
|
|
||||||
private IntWritable result = new IntWritable();
|
|
||||||
|
|
||||||
public void reduce(Text key, Iterable<IntWritable> values, Context context)
|
|
||||||
throws IOException, InterruptedException {
|
|
||||||
int sum = 0;
|
|
||||||
for (IntWritable val : values) {
|
|
||||||
sum += val.get();
|
|
||||||
}
|
|
||||||
result.set(sum);
|
|
||||||
context.write(key, result);
|
|
||||||
context.getCounter(Counters.MY_COUNTER_MAX).increment(100);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Mock resource reporting.
|
|
||||||
*/
|
|
||||||
public static class MockResourceCalculatorProcessTree
|
|
||||||
extends ResourceCalculatorProcessTree {
|
|
||||||
|
|
||||||
public MockResourceCalculatorProcessTree(String root) {
|
|
||||||
super(root);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void updateProcessTree() {
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String getProcessTreeDump() {
|
|
||||||
return "";
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public long getCumulativeCpuTime() {
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean checkPidPgrpidForMatch() {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public long getRssMemorySize() {
|
|
||||||
return 1024;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public long getVirtualMemorySize() {
|
|
||||||
return 2000;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public float getCpuUsagePercent() {
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testMockResourceCalculatorProcessTree() {
|
|
||||||
ResourceCalculatorProcessTree tree;
|
|
||||||
tree = ResourceCalculatorProcessTree.getResourceCalculatorProcessTree(
|
|
||||||
"1", TestJobCounters.MockResourceCalculatorProcessTree.class,
|
|
||||||
new Configuration());
|
|
||||||
assertNotNull(tree);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* End to end test of maximum counters.
|
|
||||||
* @throws IOException test failed
|
|
||||||
* @throws ClassNotFoundException test failed
|
|
||||||
* @throws InterruptedException test failed
|
|
||||||
*/
|
|
||||||
@Test
|
|
||||||
public void testMaxCounter()
|
|
||||||
throws IOException, ClassNotFoundException, InterruptedException {
|
|
||||||
// Create mapreduce cluster
|
|
||||||
MiniMRClientCluster mrCluster = MiniMRClientClusterFactory.create(
|
|
||||||
this.getClass(), 2, new Configuration());
|
|
||||||
|
|
||||||
try {
|
|
||||||
// Setup input and output paths
|
|
||||||
Path rootDir =
|
|
||||||
new Path(System.getProperty("test.build.data", "/tmp"));
|
|
||||||
Path testRootDir = new Path(rootDir, "testMaxCounter");
|
|
||||||
Path testInputDir = new Path(testRootDir, "input");
|
|
||||||
Path testOutputDir = new Path(testRootDir, "output");
|
|
||||||
FileSystem fs = FileSystem.getLocal(new Configuration());
|
|
||||||
fs.mkdirs(testInputDir);
|
|
||||||
Path testInputFile = new Path(testInputDir, "file01");
|
|
||||||
FSDataOutputStream stream =
|
|
||||||
fs.create(testInputFile);
|
|
||||||
stream.writeChars("foo");
|
|
||||||
stream.writeChars("bar");
|
|
||||||
stream.close();
|
|
||||||
fs.delete(testOutputDir, true);
|
|
||||||
|
|
||||||
// Run job (1 mapper, 2 reducers)
|
|
||||||
Configuration conf = new Configuration();
|
|
||||||
conf.setClass(MRConfig.RESOURCE_CALCULATOR_PROCESS_TREE,
|
|
||||||
MockResourceCalculatorProcessTree.class,
|
|
||||||
ResourceCalculatorProcessTree.class);
|
|
||||||
Job job = Job.getInstance(conf, "word count");
|
|
||||||
job.setJarByClass(WordCount.class);
|
|
||||||
job.setMapperClass(TokenizerMapper.class);
|
|
||||||
job.setCombinerClass(IntSumReducer.class);
|
|
||||||
job.setReducerClass(IntSumReducer.class);
|
|
||||||
job.setOutputKeyClass(Text.class);
|
|
||||||
job.setOutputValueClass(IntWritable.class);
|
|
||||||
job.setNumReduceTasks(2); // make sure we have double here to test max
|
|
||||||
org.apache.hadoop.mapreduce.lib.input.FileInputFormat
|
|
||||||
.addInputPath(job, testInputDir);
|
|
||||||
org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
|
|
||||||
.setOutputPath(job, testOutputDir);
|
|
||||||
assertTrue(job.waitForCompletion(true));
|
|
||||||
|
|
||||||
// Verify physical numbers
|
|
||||||
org.apache.hadoop.mapreduce.Counter maxMap =
|
|
||||||
job.getCounters().findCounter(
|
|
||||||
TaskCounter.MAP_PHYSICAL_MEMORY_BYTES_MAX);
|
|
||||||
org.apache.hadoop.mapreduce.Counter maxReduce =
|
|
||||||
job.getCounters().findCounter(
|
|
||||||
TaskCounter.REDUCE_PHYSICAL_MEMORY_BYTES_MAX);
|
|
||||||
org.apache.hadoop.mapreduce.Counter allP =
|
|
||||||
job.getCounters().findCounter(
|
|
||||||
TaskCounter.PHYSICAL_MEMORY_BYTES);
|
|
||||||
assertEquals(1024, maxMap.getValue());
|
|
||||||
assertEquals(1024, maxReduce.getValue());
|
|
||||||
assertEquals(3072, allP.getValue());
|
|
||||||
|
|
||||||
// Verify virtual numbers
|
|
||||||
org.apache.hadoop.mapreduce.Counter maxMapV =
|
|
||||||
job.getCounters().findCounter(
|
|
||||||
TaskCounter.MAP_VIRTUAL_MEMORY_BYTES_MAX);
|
|
||||||
org.apache.hadoop.mapreduce.Counter maxReduceV =
|
|
||||||
job.getCounters().findCounter(
|
|
||||||
TaskCounter.REDUCE_VIRTUAL_MEMORY_BYTES_MAX);
|
|
||||||
org.apache.hadoop.mapreduce.Counter allV =
|
|
||||||
job.getCounters().findCounter(
|
|
||||||
TaskCounter.VIRTUAL_MEMORY_BYTES);
|
|
||||||
assertEquals(2000, maxMapV.getValue());
|
|
||||||
assertEquals(2000, maxReduceV.getValue());
|
|
||||||
assertEquals(6000, allV.getValue());
|
|
||||||
|
|
||||||
// Make sure customer counters are not affected by the _MAX
|
|
||||||
// code in FrameworkCountersGroup
|
|
||||||
org.apache.hadoop.mapreduce.Counter customerCounter =
|
|
||||||
job.getCounters().findCounter(
|
|
||||||
IntSumReducer.Counters.MY_COUNTER_MAX);
|
|
||||||
assertEquals(200, customerCounter.getValue());
|
|
||||||
|
|
||||||
fs.delete(testInputDir, true);
|
|
||||||
fs.delete(testOutputDir, true);
|
|
||||||
} finally {
|
|
||||||
mrCluster.stop();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue