MAPREDUCE-7190. Add SleepJob additional parameter to make parallel runs distinguishable. Contributed by Adam Antal.

Signed-off-by: Wei-Chiu Chuang <weichiu@apache.org>
This commit is contained in:
Adam Antal 2019-04-01 10:24:48 -07:00 committed by Wei-Chiu Chuang
parent 04f1db8936
commit 856cbf62d3
2 changed files with 38 additions and 6 deletions

View File

@ -49,6 +49,8 @@ public class SleepJob extends Configured implements Tool {
public static String REDUCE_SLEEP_TIME = public static String REDUCE_SLEEP_TIME =
"mapreduce.sleepjob.reduce.sleep.time"; "mapreduce.sleepjob.reduce.sleep.time";
public static final String SLEEP_JOB_NAME = "Sleep job";
public static class SleepJobPartitioner extends public static class SleepJobPartitioner extends
Partitioner<IntWritable, NullWritable> { Partitioner<IntWritable, NullWritable> {
public int getPartition(IntWritable k, NullWritable v, int numPartitions) { public int getPartition(IntWritable k, NullWritable v, int numPartitions) {
@ -197,7 +199,15 @@ public class SleepJob extends Configured implements Tool {
public Job createJob(int numMapper, int numReducer, public Job createJob(int numMapper, int numReducer,
long mapSleepTime, int mapSleepCount, long mapSleepTime, int mapSleepCount,
long reduceSleepTime, int reduceSleepCount) long reduceSleepTime, int reduceSleepCount) throws IOException {
return createJob(numMapper, numReducer, mapSleepTime, mapSleepCount,
reduceSleepTime, reduceSleepCount, SLEEP_JOB_NAME);
}
public Job createJob(int numMapper, int numReducer,
long mapSleepTime, int mapSleepCount,
long reduceSleepTime, int reduceSleepCount,
String name)
throws IOException { throws IOException {
Configuration conf = getConf(); Configuration conf = getConf();
conf.setLong(MAP_SLEEP_TIME, mapSleepTime); conf.setLong(MAP_SLEEP_TIME, mapSleepTime);
@ -205,7 +215,7 @@ public class SleepJob extends Configured implements Tool {
conf.setInt(MAP_SLEEP_COUNT, mapSleepCount); conf.setInt(MAP_SLEEP_COUNT, mapSleepCount);
conf.setInt(REDUCE_SLEEP_COUNT, reduceSleepCount); conf.setInt(REDUCE_SLEEP_COUNT, reduceSleepCount);
conf.setInt(MRJobConfig.NUM_MAPS, numMapper); conf.setInt(MRJobConfig.NUM_MAPS, numMapper);
Job job = Job.getInstance(conf, "sleep"); Job job = Job.getInstance(conf);
job.setNumReduceTasks(numReducer); job.setNumReduceTasks(numReducer);
job.setJarByClass(SleepJob.class); job.setJarByClass(SleepJob.class);
job.setMapperClass(SleepMapper.class); job.setMapperClass(SleepMapper.class);
@ -216,7 +226,11 @@ public class SleepJob extends Configured implements Tool {
job.setInputFormatClass(SleepInputFormat.class); job.setInputFormatClass(SleepInputFormat.class);
job.setPartitionerClass(SleepJobPartitioner.class); job.setPartitionerClass(SleepJobPartitioner.class);
job.setSpeculativeExecution(false); job.setSpeculativeExecution(false);
job.setJobName("Sleep job"); if (SLEEP_JOB_NAME.equals(name)) {
job.setJobName(SLEEP_JOB_NAME);
} else {
job.setJobName(SLEEP_JOB_NAME + " - " + name);
}
FileInputFormat.addInputPath(job, new Path("ignored")); FileInputFormat.addInputPath(job, new Path("ignored"));
return job; return job;
} }
@ -230,6 +244,7 @@ public class SleepJob extends Configured implements Tool {
int numMapper = 1, numReducer = 1; int numMapper = 1, numReducer = 1;
long mapSleepTime = 100, reduceSleepTime = 100, recSleepTime = 100; long mapSleepTime = 100, reduceSleepTime = 100, recSleepTime = 100;
int mapSleepCount = 1, reduceSleepCount = 1; int mapSleepCount = 1, reduceSleepCount = 1;
String name = SLEEP_JOB_NAME;
for(int i=0; i < args.length; i++ ) { for(int i=0; i < args.length; i++ ) {
if(args[i].equals("-m")) { if(args[i].equals("-m")) {
@ -262,6 +277,8 @@ public class SleepJob extends Configured implements Tool {
if (recSleepTime < 0) { if (recSleepTime < 0) {
return printUsage(recSleepTime + ": recordSleepTime must be >= 0"); return printUsage(recSleepTime + ": recordSleepTime must be >= 0");
} }
} else if (args[i].equals("-name")) {
name = args[++i];
} }
} }
@ -269,7 +286,7 @@ public class SleepJob extends Configured implements Tool {
mapSleepCount = (int)Math.ceil(mapSleepTime / ((double)recSleepTime)); mapSleepCount = (int)Math.ceil(mapSleepTime / ((double)recSleepTime));
reduceSleepCount = (int)Math.ceil(reduceSleepTime / ((double)recSleepTime)); reduceSleepCount = (int)Math.ceil(reduceSleepTime / ((double)recSleepTime));
Job job = createJob(numMapper, numReducer, mapSleepTime, Job job = createJob(numMapper, numReducer, mapSleepTime,
mapSleepCount, reduceSleepTime, reduceSleepCount); mapSleepCount, reduceSleepTime, reduceSleepCount, name);
return job.waitForCompletion(true) ? 0 : 1; return job.waitForCompletion(true) ? 0 : 1;
} }
@ -279,7 +296,7 @@ public class SleepJob extends Configured implements Tool {
} }
System.err.println("SleepJob [-m numMapper] [-r numReducer]" + System.err.println("SleepJob [-m numMapper] [-r numReducer]" +
" [-mt mapSleepTime (msec)] [-rt reduceSleepTime (msec)]" + " [-mt mapSleepTime (msec)] [-rt reduceSleepTime (msec)]" +
" [-recordt recordSleepTime (msec)]"); " [-recordt recordSleepTime (msec)] [-name]");
ToolRunner.printGenericCommandUsage(System.err); ToolRunner.printGenericCommandUsage(System.err);
return 2; return 2;
} }

View File

@ -1379,4 +1379,19 @@ public class TestMRJobs {
} }
} }
} }
@Test
public void testSleepJobName() throws IOException {
SleepJob sleepJob = new SleepJob();
sleepJob.setConf(conf);
Job job1 = sleepJob.createJob(1, 1, 1, 1, 1, 1);
Assert.assertEquals("Wrong default name of sleep job.",
job1.getJobName(), SleepJob.SLEEP_JOB_NAME);
String expectedJob2Name = SleepJob.SLEEP_JOB_NAME + " - test";
Job job2 = sleepJob.createJob(1, 1, 1, 1, 1, 1, "test");
Assert.assertEquals("Wrong name of sleep job.",
job2.getJobName(), expectedJob2Name);
}
} }