diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/GrowingSleepJob.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/GrowingSleepJob.java new file mode 100644 index 00000000000..55740f71311 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/GrowingSleepJob.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapreduce; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.util.ToolRunner; + +import java.io.IOException; +import java.util.ArrayList; + +/** + * A sleep job whose mappers create 1MB buffer for every record. + */ +public class GrowingSleepJob extends SleepJob { + private static final Log LOG = LogFactory.getLog(GrowingSleepJob.class); + + public static class GrowingSleepMapper extends SleepMapper { + private final int MB = 1024 * 1024; + private ArrayList bytes = new ArrayList<>(); + + @Override + public void map(IntWritable key, IntWritable value, Context context) + throws IOException, InterruptedException { + super.map(key, value, context); + long free = Runtime.getRuntime().freeMemory(); + if (free > 32 * MB) { + LOG.info("Free memory = " + free + + " bytes. Creating 1 MB on the heap."); + bytes.add(new byte[MB]); + } + } + } + + public static void main(String[] args) throws Exception { + int res = ToolRunner.run(new Configuration(), new GrowingSleepJob(), args); + System.exit(res); + } + + @Override + public Job createJob(int numMapper, int numReducer, + long mapSleepTime, int mapSleepCount, + long reduceSleepTime, int reduceSleepCount) + throws IOException { + Job job = super.createJob(numMapper, numReducer, mapSleepTime, + mapSleepCount, reduceSleepTime, reduceSleepCount); + job.setMapperClass(GrowingSleepMapper.class); + job.setJobName("Growing sleep job"); + return job; + } +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/test/MapredTestDriver.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/test/MapredTestDriver.java index 76198b82849..a2a13d5ba91 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/test/MapredTestDriver.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/test/MapredTestDriver.java @@ -42,6 +42,7 @@ import org.apache.hadoop.mapred.TestTextInputFormat; import org.apache.hadoop.mapred.ThreadedMapBenchmark; import org.apache.hadoop.mapreduce.FailJob; +import org.apache.hadoop.mapreduce.GrowingSleepJob; import org.apache.hadoop.mapreduce.LargeSorter; import org.apache.hadoop.mapreduce.MiniHadoopClusterManager; import org.apache.hadoop.mapreduce.SleepJob; @@ -90,6 +91,8 @@ public MapredTestDriver(ProgramDriver pgd) { pgd.addClass("fail", FailJob.class, "a job that always fails"); pgd.addClass("sleep", SleepJob.class, "A job that sleeps at each map and reduce task."); + pgd.addClass("gsleep", GrowingSleepJob.class, + "A sleep job whose mappers create 1MB buffer for every record."); pgd.addClass("timelineperformance", TimelineServicePerformance.class, "A job that launches mappers to test timline service " + "performance.");