From 856cbf62d3fd1c885ae8374f08d7c18c83941791 Mon Sep 17 00:00:00 2001 From: Adam Antal Date: Mon, 1 Apr 2019 10:24:48 -0700 Subject: [PATCH] MAPREDUCE-7190. Add SleepJob additional parameter to make parallel runs distinguishable. Contributed by Adam Antal. Signed-off-by: Wei-Chiu Chuang --- .../org/apache/hadoop/mapreduce/SleepJob.java | 29 +++++++++++++++---- .../hadoop/mapreduce/v2/TestMRJobs.java | 15 ++++++++++ 2 files changed, 38 insertions(+), 6 deletions(-) diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/SleepJob.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/SleepJob.java index 2b321833566..b3f2712c50c 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/SleepJob.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/SleepJob.java @@ -49,6 +49,8 @@ public class SleepJob extends Configured implements Tool { public static String REDUCE_SLEEP_TIME = "mapreduce.sleepjob.reduce.sleep.time"; + public static final String SLEEP_JOB_NAME = "Sleep job"; + public static class SleepJobPartitioner extends Partitioner { public int getPartition(IntWritable k, NullWritable v, int numPartitions) { @@ -195,9 +197,17 @@ public class SleepJob extends Configured implements Tool { System.exit(res); } - public Job createJob(int numMapper, int numReducer, + public Job createJob(int numMapper, int numReducer, + long mapSleepTime, int mapSleepCount, + long reduceSleepTime, int reduceSleepCount) throws IOException { + return createJob(numMapper, numReducer, mapSleepTime, mapSleepCount, + reduceSleepTime, reduceSleepCount, SLEEP_JOB_NAME); + } + + public Job createJob(int numMapper, int numReducer, long mapSleepTime, int mapSleepCount, - long reduceSleepTime, int reduceSleepCount) + long reduceSleepTime, int reduceSleepCount, + String name) throws IOException { Configuration conf = getConf(); conf.setLong(MAP_SLEEP_TIME, mapSleepTime); @@ -205,7 +215,7 @@ public class SleepJob extends Configured implements Tool { conf.setInt(MAP_SLEEP_COUNT, mapSleepCount); conf.setInt(REDUCE_SLEEP_COUNT, reduceSleepCount); conf.setInt(MRJobConfig.NUM_MAPS, numMapper); - Job job = Job.getInstance(conf, "sleep"); + Job job = Job.getInstance(conf); job.setNumReduceTasks(numReducer); job.setJarByClass(SleepJob.class); job.setMapperClass(SleepMapper.class); @@ -216,7 +226,11 @@ public class SleepJob extends Configured implements Tool { job.setInputFormatClass(SleepInputFormat.class); job.setPartitionerClass(SleepJobPartitioner.class); job.setSpeculativeExecution(false); - job.setJobName("Sleep job"); + if (SLEEP_JOB_NAME.equals(name)) { + job.setJobName(SLEEP_JOB_NAME); + } else { + job.setJobName(SLEEP_JOB_NAME + " - " + name); + } FileInputFormat.addInputPath(job, new Path("ignored")); return job; } @@ -230,6 +244,7 @@ public class SleepJob extends Configured implements Tool { int numMapper = 1, numReducer = 1; long mapSleepTime = 100, reduceSleepTime = 100, recSleepTime = 100; int mapSleepCount = 1, reduceSleepCount = 1; + String name = SLEEP_JOB_NAME; for(int i=0; i < args.length; i++ ) { if(args[i].equals("-m")) { @@ -262,6 +277,8 @@ public class SleepJob extends Configured implements Tool { if (recSleepTime < 0) { return printUsage(recSleepTime + ": recordSleepTime must be >= 0"); } + } else if (args[i].equals("-name")) { + name = args[++i]; } } @@ -269,7 +286,7 @@ public class SleepJob extends Configured implements Tool { mapSleepCount = (int)Math.ceil(mapSleepTime / ((double)recSleepTime)); reduceSleepCount = (int)Math.ceil(reduceSleepTime / ((double)recSleepTime)); Job job = createJob(numMapper, numReducer, mapSleepTime, - mapSleepCount, reduceSleepTime, reduceSleepCount); + mapSleepCount, reduceSleepTime, reduceSleepCount, name); return job.waitForCompletion(true) ? 0 : 1; } @@ -279,7 +296,7 @@ public class SleepJob extends Configured implements Tool { } System.err.println("SleepJob [-m numMapper] [-r numReducer]" + " [-mt mapSleepTime (msec)] [-rt reduceSleepTime (msec)]" + - " [-recordt recordSleepTime (msec)]"); + " [-recordt recordSleepTime (msec)] [-name]"); ToolRunner.printGenericCommandUsage(System.err); return 2; } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java index c8b40037d51..33bbb86bf58 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java @@ -1379,4 +1379,19 @@ public class TestMRJobs { } } } + + @Test + public void testSleepJobName() throws IOException { + SleepJob sleepJob = new SleepJob(); + sleepJob.setConf(conf); + + Job job1 = sleepJob.createJob(1, 1, 1, 1, 1, 1); + Assert.assertEquals("Wrong default name of sleep job.", + job1.getJobName(), SleepJob.SLEEP_JOB_NAME); + + String expectedJob2Name = SleepJob.SLEEP_JOB_NAME + " - test"; + Job job2 = sleepJob.createJob(1, 1, 1, 1, 1, 1, "test"); + Assert.assertEquals("Wrong name of sleep job.", + job2.getJobName(), expectedJob2Name); + } }