MAPREDUCE-7190. Add SleepJob additional parameter to make parallel runs distinguishable. Contributed by Adam Antal.

Signed-off-by: Wei-Chiu Chuang <weichiu@apache.org>
(cherry picked from commit 856cbf62d3)
This commit is contained in:
Adam Antal 2019-04-01 10:24:48 -07:00 committed by Wei-Chiu Chuang
parent 67cdf807a2
commit ddf8859655
2 changed files with 38 additions and 6 deletions

View File

@ -49,6 +49,8 @@ public class SleepJob extends Configured implements Tool {
public static String REDUCE_SLEEP_TIME = public static String REDUCE_SLEEP_TIME =
"mapreduce.sleepjob.reduce.sleep.time"; "mapreduce.sleepjob.reduce.sleep.time";
public static final String SLEEP_JOB_NAME = "Sleep job";
public static class SleepJobPartitioner extends public static class SleepJobPartitioner extends
Partitioner<IntWritable, NullWritable> { Partitioner<IntWritable, NullWritable> {
public int getPartition(IntWritable k, NullWritable v, int numPartitions) { public int getPartition(IntWritable k, NullWritable v, int numPartitions) {
@ -195,9 +197,17 @@ public class SleepJob extends Configured implements Tool {
System.exit(res); System.exit(res);
} }
public Job createJob(int numMapper, int numReducer,
long mapSleepTime, int mapSleepCount,
long reduceSleepTime, int reduceSleepCount) throws IOException {
return createJob(numMapper, numReducer, mapSleepTime, mapSleepCount,
reduceSleepTime, reduceSleepCount, SLEEP_JOB_NAME);
}
public Job createJob(int numMapper, int numReducer, public Job createJob(int numMapper, int numReducer,
long mapSleepTime, int mapSleepCount, long mapSleepTime, int mapSleepCount,
long reduceSleepTime, int reduceSleepCount) long reduceSleepTime, int reduceSleepCount,
String name)
throws IOException { throws IOException {
Configuration conf = getConf(); Configuration conf = getConf();
conf.setLong(MAP_SLEEP_TIME, mapSleepTime); conf.setLong(MAP_SLEEP_TIME, mapSleepTime);
@ -205,7 +215,7 @@ public class SleepJob extends Configured implements Tool {
conf.setInt(MAP_SLEEP_COUNT, mapSleepCount); conf.setInt(MAP_SLEEP_COUNT, mapSleepCount);
conf.setInt(REDUCE_SLEEP_COUNT, reduceSleepCount); conf.setInt(REDUCE_SLEEP_COUNT, reduceSleepCount);
conf.setInt(MRJobConfig.NUM_MAPS, numMapper); conf.setInt(MRJobConfig.NUM_MAPS, numMapper);
Job job = Job.getInstance(conf, "sleep"); Job job = Job.getInstance(conf);
job.setNumReduceTasks(numReducer); job.setNumReduceTasks(numReducer);
job.setJarByClass(SleepJob.class); job.setJarByClass(SleepJob.class);
job.setMapperClass(SleepMapper.class); job.setMapperClass(SleepMapper.class);
@ -216,7 +226,11 @@ public class SleepJob extends Configured implements Tool {
job.setInputFormatClass(SleepInputFormat.class); job.setInputFormatClass(SleepInputFormat.class);
job.setPartitionerClass(SleepJobPartitioner.class); job.setPartitionerClass(SleepJobPartitioner.class);
job.setSpeculativeExecution(false); job.setSpeculativeExecution(false);
job.setJobName("Sleep job"); if (SLEEP_JOB_NAME.equals(name)) {
job.setJobName(SLEEP_JOB_NAME);
} else {
job.setJobName(SLEEP_JOB_NAME + " - " + name);
}
FileInputFormat.addInputPath(job, new Path("ignored")); FileInputFormat.addInputPath(job, new Path("ignored"));
return job; return job;
} }
@ -230,6 +244,7 @@ public class SleepJob extends Configured implements Tool {
int numMapper = 1, numReducer = 1; int numMapper = 1, numReducer = 1;
long mapSleepTime = 100, reduceSleepTime = 100, recSleepTime = 100; long mapSleepTime = 100, reduceSleepTime = 100, recSleepTime = 100;
int mapSleepCount = 1, reduceSleepCount = 1; int mapSleepCount = 1, reduceSleepCount = 1;
String name = SLEEP_JOB_NAME;
for(int i=0; i < args.length; i++ ) { for(int i=0; i < args.length; i++ ) {
if(args[i].equals("-m")) { if(args[i].equals("-m")) {
@ -262,6 +277,8 @@ public class SleepJob extends Configured implements Tool {
if (recSleepTime < 0) { if (recSleepTime < 0) {
return printUsage(recSleepTime + ": recordSleepTime must be >= 0"); return printUsage(recSleepTime + ": recordSleepTime must be >= 0");
} }
} else if (args[i].equals("-name")) {
name = args[++i];
} }
} }
@ -269,7 +286,7 @@ public class SleepJob extends Configured implements Tool {
mapSleepCount = (int)Math.ceil(mapSleepTime / ((double)recSleepTime)); mapSleepCount = (int)Math.ceil(mapSleepTime / ((double)recSleepTime));
reduceSleepCount = (int)Math.ceil(reduceSleepTime / ((double)recSleepTime)); reduceSleepCount = (int)Math.ceil(reduceSleepTime / ((double)recSleepTime));
Job job = createJob(numMapper, numReducer, mapSleepTime, Job job = createJob(numMapper, numReducer, mapSleepTime,
mapSleepCount, reduceSleepTime, reduceSleepCount); mapSleepCount, reduceSleepTime, reduceSleepCount, name);
return job.waitForCompletion(true) ? 0 : 1; return job.waitForCompletion(true) ? 0 : 1;
} }
@ -279,7 +296,7 @@ public class SleepJob extends Configured implements Tool {
} }
System.err.println("SleepJob [-m numMapper] [-r numReducer]" + System.err.println("SleepJob [-m numMapper] [-r numReducer]" +
" [-mt mapSleepTime (msec)] [-rt reduceSleepTime (msec)]" + " [-mt mapSleepTime (msec)] [-rt reduceSleepTime (msec)]" +
" [-recordt recordSleepTime (msec)]"); " [-recordt recordSleepTime (msec)] [-name]");
ToolRunner.printGenericCommandUsage(System.err); ToolRunner.printGenericCommandUsage(System.err);
return 2; return 2;
} }

View File

@ -1379,4 +1379,19 @@ public class TestMRJobs {
} }
} }
} }
@Test
public void testSleepJobName() throws IOException {
SleepJob sleepJob = new SleepJob();
sleepJob.setConf(conf);
Job job1 = sleepJob.createJob(1, 1, 1, 1, 1, 1);
Assert.assertEquals("Wrong default name of sleep job.",
job1.getJobName(), SleepJob.SLEEP_JOB_NAME);
String expectedJob2Name = SleepJob.SLEEP_JOB_NAME + " - test";
Job job2 = sleepJob.createJob(1, 1, 1, 1, 1, 1, "test");
Assert.assertEquals("Wrong name of sleep job.",
job2.getJobName(), expectedJob2Name);
}
} }