diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/LocalFetcher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/LocalFetcher.java index 3ae1e746fcb..dc563eeab4d 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/LocalFetcher.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/LocalFetcher.java @@ -153,10 +153,10 @@ private boolean copyMapOutput(TaskAttemptID mapTaskId) throws IOException { FileSystem localFs = FileSystem.getLocal(job).getRaw(); FSDataInputStream inStream = localFs.open(mapOutputFileName); try { + inStream.seek(ir.startOffset); inStream = IntermediateEncryptedStream.wrapIfNecessary(job, inStream, mapOutputFileName); - inStream.seek(ir.startOffset + CryptoUtils.cryptoPadding(job)); mapOutput.shuffle(LOCALHOST, inStream, compressedLength, decompressedLength, metrics, reporter); } finally { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestMRIntermediateDataEncryption.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestMRIntermediateDataEncryption.java index 79fcd4110ca..fbee7ef5c0f 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestMRIntermediateDataEncryption.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestMRIntermediateDataEncryption.java @@ -44,6 +44,7 @@ import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileChecksum; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -75,9 +76,20 @@ * mbs-per-map specifies the amount of data (in MBs) to generate per map. * By default, this is twice the value of mapreduce.task.io.sort.mb * map-tasks specifies the number of map tasks to run. + * Steps of the unit test: + * 1- Generating random input text. + * 2- Run a job with encryption disabled. Get the checksum of the output file + * checkSumReference. + * 3- Run the job with encryption enabled. + * 4- Compare checkSumReference to the checksum of the job output. + * 5- If the job has multiple reducers, the test launches one final job to + * combine the output files into a single one. + * 6- Verify that the maps spilled files. */ @RunWith(Parameterized.class) public class TestMRIntermediateDataEncryption { + public static final Logger LOG = + LoggerFactory.getLogger(TestMRIntermediateDataEncryption.class); /** * The number of bytes generated by the input generator. */ @@ -86,8 +98,6 @@ public class TestMRIntermediateDataEncryption { public static final int INPUT_GEN_NUM_THREADS = 16; public static final long TASK_SORT_IO_MB_DEFAULT = 128L; public static final String JOB_DIR_PATH = "jobs-data-path"; - private static final Logger LOG = - LoggerFactory.getLogger(TestMRIntermediateDataEncryption.class); /** * Directory of the test data. */ @@ -97,6 +107,7 @@ public class TestMRIntermediateDataEncryption { private static MiniDFSCluster dfsCluster; private static MiniMRClientCluster mrCluster; private static FileSystem fs; + private static FileChecksum checkSumReference; private static Path jobInputDirPath; private static long inputFileSize; /** @@ -136,11 +147,7 @@ public static Collection getTestParameters() { {"testSingleReducer", 3, 1, false}, {"testUberMode", 3, 1, true}, {"testMultipleMapsPerNode", 8, 1, false}, - // TODO: The following configuration is commented out until - // MAPREDUCE-7325 is fixed. - // Setting multiple reducers breaks LocalJobRunner causing the - // unit test to fail. - // {"testMultipleReducers", 2, 4, false} + {"testMultipleReducers", 2, 4, false} }); } @@ -171,6 +178,8 @@ public static void setupClass() throws Exception { // run the input generator job. Assert.assertEquals("Generating input should succeed", 0, generateInputTextFile()); + // run the reference job + runReferenceJob(); } @AfterClass @@ -185,7 +194,7 @@ public static void tearDown() throws IOException { // make sure that generated input file is deleted final File textInputFile = new File(testRootDir, "input.txt"); if (textInputFile.exists()) { - textInputFile.delete(); + Assert.assertTrue(textInputFile.delete()); } } @@ -198,7 +207,7 @@ private static Configuration createBaseConfiguration() { // Set the jvm arguments to enable intermediate encryption. Configuration conf = MRJobConfUtil.initEncryptedIntermediateConfigsForTesting(null); - // Set the temp directories a subdir of the test directory. + // Set the temp directories a subDir of the test directory. conf = MRJobConfUtil.setLocalDirectoriesConfigForTesting(conf, testRootDir); conf.setLong("dfs.blocksize", BLOCK_SIZE_DEFAULT); return conf; @@ -207,7 +216,7 @@ private static Configuration createBaseConfiguration() { /** * Creates a thread safe BufferedWriter to be used among the task generators. * @return A synchronized BufferedWriter to the input file. - * @throws IOException + * @throws IOException opening a new {@link FileWriter}. */ private static synchronized BufferedWriter getTextInputWriter() throws IOException { @@ -223,7 +232,7 @@ private static synchronized BufferedWriter getTextInputWriter() * It creates a total INPUT_GEN_NUM_THREADS future tasks. * * @return the result of the input generation. 0 for success. - * @throws Exception + * @throws Exception during the I/O of job. */ private static int generateInputTextFile() throws Exception { final File textInputFile = new File(testRootDir, "input.txt"); @@ -270,6 +279,118 @@ private static int generateInputTextFile() throws Exception { return 0; } + /** + * Runs a WordCount job with encryption disabled and stores the checksum of + * the output file. + * @throws Exception due to I/O errors. + */ + private static void runReferenceJob() throws Exception { + final String jobRefLabel = "job-reference"; + final Path jobRefDirPath = new Path(JOB_DIR_PATH, jobRefLabel); + if (fs.exists(jobRefDirPath) && !fs.delete(jobRefDirPath, true)) { + throw new IOException("Could not delete " + jobRefDirPath); + } + Assert.assertTrue(fs.mkdirs(jobRefDirPath)); + Path jobRefOutputPath = new Path(jobRefDirPath, "out-dir"); + Configuration referenceConf = new Configuration(commonConfig); + referenceConf.setBoolean(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA, false); + Job jobReference = runWordCountJob(jobRefLabel, jobRefOutputPath, + referenceConf, 4, 1); + Assert.assertTrue(jobReference.isSuccessful()); + FileStatus[] fileStatusArr = + fs.listStatus(jobRefOutputPath, + new Utils.OutputFileUtils.OutputFilesFilter()); + Assert.assertEquals(1, fileStatusArr.length); + checkSumReference = fs.getFileChecksum(fileStatusArr[0].getPath()); + Assert.assertTrue(fs.delete(jobRefDirPath, true)); + } + + private static Job runWordCountJob(String postfixName, Path jOutputPath, + Configuration jConf, int mappers, int reducers) throws Exception { + Job job = Job.getInstance(jConf); + job.getConfiguration().setInt(MRJobConfig.NUM_MAPS, mappers); + job.setJarByClass(TestMRIntermediateDataEncryption.class); + job.setJobName("mr-spill-" + postfixName); + // Mapper configuration + job.setMapperClass(TokenizerMapper.class); + job.setInputFormatClass(TextInputFormat.class); + job.setCombinerClass(LongSumReducer.class); + FileInputFormat.setMinInputSplitSize(job, + (inputFileSize + mappers) / mappers); + // Reducer configuration + job.setReducerClass(LongSumReducer.class); + job.setNumReduceTasks(reducers); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(LongWritable.class); + // Set the IO paths for the job. + FileInputFormat.addInputPath(job, jobInputDirPath); + FileOutputFormat.setOutputPath(job, jOutputPath); + if (job.waitForCompletion(true)) { + FileStatus[] fileStatusArr = + fs.listStatus(jOutputPath, + new Utils.OutputFileUtils.OutputFilesFilter()); + for (FileStatus fStatus : fileStatusArr) { + LOG.info("Job: {} .. Output file {} .. Size = {}", + postfixName, fStatus.getPath(), fStatus.getLen()); + } + } + return job; + } + + /** + * Compares the checksum of the output file to the + * checkSumReference. + * If the job has a multiple reducers, the output files are combined by + * launching another job. + * @return true if the checksums are equal. + * @throws Exception if the output is missing or the combiner job fails. + */ + private boolean validateJobOutput() throws Exception { + Assert.assertTrue("Job Output path [" + jobOutputPath + "] should exist", + fs.exists(jobOutputPath)); + Path outputPath = jobOutputPath; + if (numReducers != 1) { + // combine the result into one file by running a combiner job + final String jobRefLabel = testTitleName + "-combine"; + final Path jobRefDirPath = new Path(JOB_DIR_PATH, jobRefLabel); + if (fs.exists(jobRefDirPath) && !fs.delete(jobRefDirPath, true)) { + throw new IOException("Could not delete " + jobRefDirPath); + } + fs.mkdirs(jobRefDirPath); + outputPath = new Path(jobRefDirPath, "out-dir"); + Configuration referenceConf = new Configuration(commonConfig); + referenceConf.setBoolean(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA, + false); + Job combinerJob = Job.getInstance(referenceConf); + combinerJob.setJarByClass(TestMRIntermediateDataEncryption.class); + combinerJob.setJobName("mr-spill-" + jobRefLabel); + combinerJob.setMapperClass(CombinerJobMapper.class); + FileInputFormat.addInputPath(combinerJob, jobOutputPath); + // Reducer configuration + combinerJob.setReducerClass(LongSumReducer.class); + combinerJob.setNumReduceTasks(1); + combinerJob.setOutputKeyClass(Text.class); + combinerJob.setOutputValueClass(LongWritable.class); + // Set the IO paths for the job. + FileOutputFormat.setOutputPath(combinerJob, outputPath); + if (!combinerJob.waitForCompletion(true)) { + return false; + } + FileStatus[] fileStatusArr = + fs.listStatus(outputPath, + new Utils.OutputFileUtils.OutputFilesFilter()); + LOG.info("Job-Combination: {} .. Output file {} .. Size = {}", + jobRefDirPath, fileStatusArr[0].getPath(), fileStatusArr[0].getLen()); + } + // Get the output files of the job. + FileStatus[] fileStatusArr = + fs.listStatus(outputPath, + new Utils.OutputFileUtils.OutputFilesFilter()); + FileChecksum jobFileChecksum = + fs.getFileChecksum(fileStatusArr[0].getPath()); + return checkSumReference.equals(jobFileChecksum); + } + @Before public void setup() throws Exception { LOG.info("Starting TestMRIntermediateDataEncryption#{}.......", @@ -284,16 +405,16 @@ public void setup() throws Exception { config = new Configuration(commonConfig); config.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, isUber); config.setFloat(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART, 1.0F); - // set the configuration to make sure that we get spilled files + // Set the configuration to make sure that we get spilled files. long ioSortMb = TASK_SORT_IO_MB_DEFAULT; config.setLong(MRJobConfig.IO_SORT_MB, ioSortMb); long mapMb = Math.max(2 * ioSortMb, config.getInt(MRJobConfig.MAP_MEMORY_MB, MRJobConfig.DEFAULT_MAP_MEMORY_MB)); - // make sure the map tasks will spill to disk. + // Make sure the map tasks will spill to disk. config.setLong(MRJobConfig.MAP_MEMORY_MB, mapMb); config.set(MRJobConfig.MAP_JAVA_OPTS, "-Xmx" + (mapMb - 200) + "m"); config.setInt(MRJobConfig.NUM_MAPS, numMappers); - // max attempts have to be set to 1 when intermediate encryption is enabled. + // Max attempts have to be set to 1 when intermediate encryption is enabled. config.setInt("mapreduce.map.maxattempts", 1); config.setInt("mapreduce.reduce.maxattempts", 1); } @@ -302,24 +423,6 @@ public void setup() throws Exception { public void testWordCount() throws Exception { LOG.info("........Starting main Job Driver #{} starting at {}.......", testTitleName, Time.formatTime(System.currentTimeMillis())); - Job job = Job.getInstance(config); - job.getConfiguration().setInt(MRJobConfig.NUM_MAPS, numMappers); - job.setJarByClass(TestMRIntermediateDataEncryption.class); - job.setJobName("mr-spill-" + testTitleName); - // Mapper configuration - job.setMapperClass(TokenizerMapper.class); - job.setInputFormatClass(TextInputFormat.class); - job.setCombinerClass(LongSumReducer.class); - FileInputFormat.setMinInputSplitSize(job, - (inputFileSize + numMappers) / numMappers); - // Reducer configuration - job.setReducerClass(LongSumReducer.class); - job.setNumReduceTasks(numReducers); - job.setOutputKeyClass(Text.class); - job.setOutputValueClass(LongWritable.class); - // Set the IO paths for the job. - FileInputFormat.addInputPath(job, jobInputDirPath); - FileOutputFormat.setOutputPath(job, jobOutputPath); SpillCallBackPathsFinder spillInjector = (SpillCallBackPathsFinder) IntermediateEncryptedStream .setSpillCBInjector(new SpillCallBackPathsFinder()); @@ -328,34 +431,36 @@ public void testWordCount() throws Exception { testTitleName)); try { long startTime = Time.monotonicNow(); - testSummary.append(String.format("%nJob %s ended at %s", + testSummary.append(String.format("%nJob %s started at %s", testTitleName, Time.formatTime(System.currentTimeMillis()))); - Assert.assertTrue(job.waitForCompletion(true)); + Job job = runWordCountJob(testTitleName, jobOutputPath, config, + numMappers, numReducers); + Assert.assertTrue(job.isSuccessful()); long endTime = Time.monotonicNow(); testSummary.append(String.format("%nJob %s ended at %s", job.getJobName(), Time.formatTime(System.currentTimeMillis()))); testSummary.append(String.format("%n\tThe job took %.3f seconds", (1.0 * (endTime - startTime)) / 1000)); - long spilledRecords = - job.getCounters().findCounter(TaskCounter.SPILLED_RECORDS).getValue(); - Assert.assertFalse( - "The encrypted spilled files should not be empty.", - spillInjector.getEncryptedSpilledFiles().isEmpty()); - Assert.assertTrue("Spill records must be greater than 0", - spilledRecords > 0); - Assert.assertTrue("Job Output path [" + jobOutputPath + "] should exist", - fs.exists(jobOutputPath)); - Assert.assertTrue("Invalid access to spill file positions", - spillInjector.getInvalidSpillEntries().isEmpty()); - FileStatus[] fileStatus = + FileStatus[] fileStatusArr = fs.listStatus(jobOutputPath, new Utils.OutputFileUtils.OutputFilesFilter()); - for (FileStatus fStatus : fileStatus) { + for (FileStatus fStatus : fileStatusArr) { long fileSize = fStatus.getLen(); testSummary.append( String.format("%n\tOutput file %s: %d", fStatus.getPath(), fileSize)); } + // Validate the checksum of the output. + Assert.assertTrue(validateJobOutput()); + // Check intermediate files and spilling. + long spilledRecords = + job.getCounters().findCounter(TaskCounter.SPILLED_RECORDS).getValue(); + Assert.assertTrue("Spill records must be greater than 0", + spilledRecords > 0); + Assert.assertFalse("The encrypted spilled files should not be empty.", + spillInjector.getEncryptedSpilledFiles().isEmpty()); + Assert.assertTrue("Invalid access to spill file positions", + spillInjector.getInvalidSpillEntries().isEmpty()); } finally { testSummary.append(spillInjector.getSpilledFileReport()); LOG.info(testSummary.toString()); @@ -408,4 +513,21 @@ public void map(Object key, Text value, } } } + + /** + * A Mapper that reads the output of WordCount passing it to the reducer. + * It is used to combine the output of multiple reducer jobs. + */ + public static class CombinerJobMapper + extends Mapper { + private final LongWritable sum = new LongWritable(0); + private final Text word = new Text(); + public void map(Object key, Text value, + Context context) throws IOException, InterruptedException { + String[] line = value.toString().split("\\s+"); + sum.set(Long.parseLong(line[1])); + word.set(line[0]); + context.write(word, sum); + } + } }