MAPREDUCE-7325. Intermediate data encryption is broken in LocalJobRunner. Contributed by Ahmed Hussein

(cherry picked from commit ede490d131)
This commit is contained in:
Jim Brennan 2021-03-22 18:41:25 +00:00
parent b3532a7c22
commit 99c34a3576
2 changed files with 170 additions and 48 deletions

View File

@ -152,10 +152,10 @@ class LocalFetcher<K,V> extends Fetcher<K, V> {
FileSystem localFs = FileSystem.getLocal(job).getRaw(); FileSystem localFs = FileSystem.getLocal(job).getRaw();
FSDataInputStream inStream = localFs.open(mapOutputFileName); FSDataInputStream inStream = localFs.open(mapOutputFileName);
try { try {
inStream.seek(ir.startOffset);
inStream = inStream =
IntermediateEncryptedStream.wrapIfNecessary(job, inStream, IntermediateEncryptedStream.wrapIfNecessary(job, inStream,
mapOutputFileName); mapOutputFileName);
inStream.seek(ir.startOffset + CryptoUtils.cryptoPadding(job));
mapOutput.shuffle(LOCALHOST, inStream, compressedLength, mapOutput.shuffle(LOCALHOST, inStream, compressedLength,
decompressedLength, metrics, reporter); decompressedLength, metrics, reporter);
} finally { } finally {

View File

@ -44,6 +44,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileChecksum;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
@ -75,9 +76,20 @@ import org.apache.hadoop.util.ToolRunner;
* mbs-per-map specifies the amount of data (in MBs) to generate per map. * mbs-per-map specifies the amount of data (in MBs) to generate per map.
* By default, this is twice the value of <code>mapreduce.task.io.sort.mb</code> * By default, this is twice the value of <code>mapreduce.task.io.sort.mb</code>
* <code>map-tasks</code> specifies the number of map tasks to run. * <code>map-tasks</code> specifies the number of map tasks to run.
* Steps of the unit test:
* 1- Generating random input text.
* 2- Run a job with encryption disabled. Get the checksum of the output file
* <code>checkSumReference</code>.
* 3- Run the job with encryption enabled.
* 4- Compare <code>checkSumReference</code> to the checksum of the job output.
* 5- If the job has multiple reducers, the test launches one final job to
* combine the output files into a single one.
* 6- Verify that the maps spilled files.
*/ */
@RunWith(Parameterized.class) @RunWith(Parameterized.class)
public class TestMRIntermediateDataEncryption { public class TestMRIntermediateDataEncryption {
public static final Logger LOG =
LoggerFactory.getLogger(TestMRIntermediateDataEncryption.class);
/** /**
* The number of bytes generated by the input generator. * The number of bytes generated by the input generator.
*/ */
@ -86,8 +98,6 @@ public class TestMRIntermediateDataEncryption {
public static final int INPUT_GEN_NUM_THREADS = 16; public static final int INPUT_GEN_NUM_THREADS = 16;
public static final long TASK_SORT_IO_MB_DEFAULT = 128L; public static final long TASK_SORT_IO_MB_DEFAULT = 128L;
public static final String JOB_DIR_PATH = "jobs-data-path"; public static final String JOB_DIR_PATH = "jobs-data-path";
private static final Logger LOG =
LoggerFactory.getLogger(TestMRIntermediateDataEncryption.class);
/** /**
* Directory of the test data. * Directory of the test data.
*/ */
@ -97,6 +107,7 @@ public class TestMRIntermediateDataEncryption {
private static MiniDFSCluster dfsCluster; private static MiniDFSCluster dfsCluster;
private static MiniMRClientCluster mrCluster; private static MiniMRClientCluster mrCluster;
private static FileSystem fs; private static FileSystem fs;
private static FileChecksum checkSumReference;
private static Path jobInputDirPath; private static Path jobInputDirPath;
private static long inputFileSize; private static long inputFileSize;
/** /**
@ -136,11 +147,7 @@ public class TestMRIntermediateDataEncryption {
{"testSingleReducer", 3, 1, false}, {"testSingleReducer", 3, 1, false},
{"testUberMode", 3, 1, true}, {"testUberMode", 3, 1, true},
{"testMultipleMapsPerNode", 8, 1, false}, {"testMultipleMapsPerNode", 8, 1, false},
// TODO: The following configuration is commented out until {"testMultipleReducers", 2, 4, false}
// MAPREDUCE-7325 is fixed.
// Setting multiple reducers breaks LocalJobRunner causing the
// unit test to fail.
// {"testMultipleReducers", 2, 4, false}
}); });
} }
@ -171,6 +178,8 @@ public class TestMRIntermediateDataEncryption {
// run the input generator job. // run the input generator job.
Assert.assertEquals("Generating input should succeed", 0, Assert.assertEquals("Generating input should succeed", 0,
generateInputTextFile()); generateInputTextFile());
// run the reference job
runReferenceJob();
} }
@AfterClass @AfterClass
@ -185,7 +194,7 @@ public class TestMRIntermediateDataEncryption {
// make sure that generated input file is deleted // make sure that generated input file is deleted
final File textInputFile = new File(testRootDir, "input.txt"); final File textInputFile = new File(testRootDir, "input.txt");
if (textInputFile.exists()) { if (textInputFile.exists()) {
textInputFile.delete(); Assert.assertTrue(textInputFile.delete());
} }
} }
@ -198,7 +207,7 @@ public class TestMRIntermediateDataEncryption {
// Set the jvm arguments to enable intermediate encryption. // Set the jvm arguments to enable intermediate encryption.
Configuration conf = Configuration conf =
MRJobConfUtil.initEncryptedIntermediateConfigsForTesting(null); MRJobConfUtil.initEncryptedIntermediateConfigsForTesting(null);
// Set the temp directories a subdir of the test directory. // Set the temp directories a subDir of the test directory.
conf = MRJobConfUtil.setLocalDirectoriesConfigForTesting(conf, testRootDir); conf = MRJobConfUtil.setLocalDirectoriesConfigForTesting(conf, testRootDir);
conf.setLong("dfs.blocksize", BLOCK_SIZE_DEFAULT); conf.setLong("dfs.blocksize", BLOCK_SIZE_DEFAULT);
return conf; return conf;
@ -207,7 +216,7 @@ public class TestMRIntermediateDataEncryption {
/** /**
* Creates a thread safe BufferedWriter to be used among the task generators. * Creates a thread safe BufferedWriter to be used among the task generators.
* @return A synchronized <code>BufferedWriter</code> to the input file. * @return A synchronized <code>BufferedWriter</code> to the input file.
* @throws IOException * @throws IOException opening a new {@link FileWriter}.
*/ */
private static synchronized BufferedWriter getTextInputWriter() private static synchronized BufferedWriter getTextInputWriter()
throws IOException { throws IOException {
@ -223,7 +232,7 @@ public class TestMRIntermediateDataEncryption {
* It creates a total <code>INPUT_GEN_NUM_THREADS</code> future tasks. * It creates a total <code>INPUT_GEN_NUM_THREADS</code> future tasks.
* *
* @return the result of the input generation. 0 for success. * @return the result of the input generation. 0 for success.
* @throws Exception * @throws Exception during the I/O of job.
*/ */
private static int generateInputTextFile() throws Exception { private static int generateInputTextFile() throws Exception {
final File textInputFile = new File(testRootDir, "input.txt"); final File textInputFile = new File(testRootDir, "input.txt");
@ -270,6 +279,118 @@ public class TestMRIntermediateDataEncryption {
return 0; return 0;
} }
/**
* Runs a WordCount job with encryption disabled and stores the checksum of
* the output file.
* @throws Exception due to I/O errors.
*/
private static void runReferenceJob() throws Exception {
final String jobRefLabel = "job-reference";
final Path jobRefDirPath = new Path(JOB_DIR_PATH, jobRefLabel);
if (fs.exists(jobRefDirPath) && !fs.delete(jobRefDirPath, true)) {
throw new IOException("Could not delete " + jobRefDirPath);
}
Assert.assertTrue(fs.mkdirs(jobRefDirPath));
Path jobRefOutputPath = new Path(jobRefDirPath, "out-dir");
Configuration referenceConf = new Configuration(commonConfig);
referenceConf.setBoolean(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA, false);
Job jobReference = runWordCountJob(jobRefLabel, jobRefOutputPath,
referenceConf, 4, 1);
Assert.assertTrue(jobReference.isSuccessful());
FileStatus[] fileStatusArr =
fs.listStatus(jobRefOutputPath,
new Utils.OutputFileUtils.OutputFilesFilter());
Assert.assertEquals(1, fileStatusArr.length);
checkSumReference = fs.getFileChecksum(fileStatusArr[0].getPath());
Assert.assertTrue(fs.delete(jobRefDirPath, true));
}
private static Job runWordCountJob(String postfixName, Path jOutputPath,
Configuration jConf, int mappers, int reducers) throws Exception {
Job job = Job.getInstance(jConf);
job.getConfiguration().setInt(MRJobConfig.NUM_MAPS, mappers);
job.setJarByClass(TestMRIntermediateDataEncryption.class);
job.setJobName("mr-spill-" + postfixName);
// Mapper configuration
job.setMapperClass(TokenizerMapper.class);
job.setInputFormatClass(TextInputFormat.class);
job.setCombinerClass(LongSumReducer.class);
FileInputFormat.setMinInputSplitSize(job,
(inputFileSize + mappers) / mappers);
// Reducer configuration
job.setReducerClass(LongSumReducer.class);
job.setNumReduceTasks(reducers);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
// Set the IO paths for the job.
FileInputFormat.addInputPath(job, jobInputDirPath);
FileOutputFormat.setOutputPath(job, jOutputPath);
if (job.waitForCompletion(true)) {
FileStatus[] fileStatusArr =
fs.listStatus(jOutputPath,
new Utils.OutputFileUtils.OutputFilesFilter());
for (FileStatus fStatus : fileStatusArr) {
LOG.info("Job: {} .. Output file {} .. Size = {}",
postfixName, fStatus.getPath(), fStatus.getLen());
}
}
return job;
}
/**
* Compares the checksum of the output file to the
* <code>checkSumReference</code>.
* If the job has a multiple reducers, the output files are combined by
* launching another job.
* @return true if the checksums are equal.
* @throws Exception if the output is missing or the combiner job fails.
*/
private boolean validateJobOutput() throws Exception {
Assert.assertTrue("Job Output path [" + jobOutputPath + "] should exist",
fs.exists(jobOutputPath));
Path outputPath = jobOutputPath;
if (numReducers != 1) {
// combine the result into one file by running a combiner job
final String jobRefLabel = testTitleName + "-combine";
final Path jobRefDirPath = new Path(JOB_DIR_PATH, jobRefLabel);
if (fs.exists(jobRefDirPath) && !fs.delete(jobRefDirPath, true)) {
throw new IOException("Could not delete " + jobRefDirPath);
}
fs.mkdirs(jobRefDirPath);
outputPath = new Path(jobRefDirPath, "out-dir");
Configuration referenceConf = new Configuration(commonConfig);
referenceConf.setBoolean(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA,
false);
Job combinerJob = Job.getInstance(referenceConf);
combinerJob.setJarByClass(TestMRIntermediateDataEncryption.class);
combinerJob.setJobName("mr-spill-" + jobRefLabel);
combinerJob.setMapperClass(CombinerJobMapper.class);
FileInputFormat.addInputPath(combinerJob, jobOutputPath);
// Reducer configuration
combinerJob.setReducerClass(LongSumReducer.class);
combinerJob.setNumReduceTasks(1);
combinerJob.setOutputKeyClass(Text.class);
combinerJob.setOutputValueClass(LongWritable.class);
// Set the IO paths for the job.
FileOutputFormat.setOutputPath(combinerJob, outputPath);
if (!combinerJob.waitForCompletion(true)) {
return false;
}
FileStatus[] fileStatusArr =
fs.listStatus(outputPath,
new Utils.OutputFileUtils.OutputFilesFilter());
LOG.info("Job-Combination: {} .. Output file {} .. Size = {}",
jobRefDirPath, fileStatusArr[0].getPath(), fileStatusArr[0].getLen());
}
// Get the output files of the job.
FileStatus[] fileStatusArr =
fs.listStatus(outputPath,
new Utils.OutputFileUtils.OutputFilesFilter());
FileChecksum jobFileChecksum =
fs.getFileChecksum(fileStatusArr[0].getPath());
return checkSumReference.equals(jobFileChecksum);
}
@Before @Before
public void setup() throws Exception { public void setup() throws Exception {
LOG.info("Starting TestMRIntermediateDataEncryption#{}.......", LOG.info("Starting TestMRIntermediateDataEncryption#{}.......",
@ -284,16 +405,16 @@ public class TestMRIntermediateDataEncryption {
config = new Configuration(commonConfig); config = new Configuration(commonConfig);
config.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, isUber); config.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, isUber);
config.setFloat(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART, 1.0F); config.setFloat(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART, 1.0F);
// set the configuration to make sure that we get spilled files // Set the configuration to make sure that we get spilled files.
long ioSortMb = TASK_SORT_IO_MB_DEFAULT; long ioSortMb = TASK_SORT_IO_MB_DEFAULT;
config.setLong(MRJobConfig.IO_SORT_MB, ioSortMb); config.setLong(MRJobConfig.IO_SORT_MB, ioSortMb);
long mapMb = Math.max(2 * ioSortMb, config.getInt(MRJobConfig.MAP_MEMORY_MB, long mapMb = Math.max(2 * ioSortMb, config.getInt(MRJobConfig.MAP_MEMORY_MB,
MRJobConfig.DEFAULT_MAP_MEMORY_MB)); MRJobConfig.DEFAULT_MAP_MEMORY_MB));
// make sure the map tasks will spill to disk. // Make sure the map tasks will spill to disk.
config.setLong(MRJobConfig.MAP_MEMORY_MB, mapMb); config.setLong(MRJobConfig.MAP_MEMORY_MB, mapMb);
config.set(MRJobConfig.MAP_JAVA_OPTS, "-Xmx" + (mapMb - 200) + "m"); config.set(MRJobConfig.MAP_JAVA_OPTS, "-Xmx" + (mapMb - 200) + "m");
config.setInt(MRJobConfig.NUM_MAPS, numMappers); config.setInt(MRJobConfig.NUM_MAPS, numMappers);
// max attempts have to be set to 1 when intermediate encryption is enabled. // Max attempts have to be set to 1 when intermediate encryption is enabled.
config.setInt("mapreduce.map.maxattempts", 1); config.setInt("mapreduce.map.maxattempts", 1);
config.setInt("mapreduce.reduce.maxattempts", 1); config.setInt("mapreduce.reduce.maxattempts", 1);
} }
@ -302,24 +423,6 @@ public class TestMRIntermediateDataEncryption {
public void testWordCount() throws Exception { public void testWordCount() throws Exception {
LOG.info("........Starting main Job Driver #{} starting at {}.......", LOG.info("........Starting main Job Driver #{} starting at {}.......",
testTitleName, Time.formatTime(System.currentTimeMillis())); testTitleName, Time.formatTime(System.currentTimeMillis()));
Job job = Job.getInstance(config);
job.getConfiguration().setInt(MRJobConfig.NUM_MAPS, numMappers);
job.setJarByClass(TestMRIntermediateDataEncryption.class);
job.setJobName("mr-spill-" + testTitleName);
// Mapper configuration
job.setMapperClass(TokenizerMapper.class);
job.setInputFormatClass(TextInputFormat.class);
job.setCombinerClass(LongSumReducer.class);
FileInputFormat.setMinInputSplitSize(job,
(inputFileSize + numMappers) / numMappers);
// Reducer configuration
job.setReducerClass(LongSumReducer.class);
job.setNumReduceTasks(numReducers);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
// Set the IO paths for the job.
FileInputFormat.addInputPath(job, jobInputDirPath);
FileOutputFormat.setOutputPath(job, jobOutputPath);
SpillCallBackPathsFinder spillInjector = SpillCallBackPathsFinder spillInjector =
(SpillCallBackPathsFinder) IntermediateEncryptedStream (SpillCallBackPathsFinder) IntermediateEncryptedStream
.setSpillCBInjector(new SpillCallBackPathsFinder()); .setSpillCBInjector(new SpillCallBackPathsFinder());
@ -328,34 +431,36 @@ public class TestMRIntermediateDataEncryption {
testTitleName)); testTitleName));
try { try {
long startTime = Time.monotonicNow(); long startTime = Time.monotonicNow();
testSummary.append(String.format("%nJob %s ended at %s", testSummary.append(String.format("%nJob %s started at %s",
testTitleName, Time.formatTime(System.currentTimeMillis()))); testTitleName, Time.formatTime(System.currentTimeMillis())));
Assert.assertTrue(job.waitForCompletion(true)); Job job = runWordCountJob(testTitleName, jobOutputPath, config,
numMappers, numReducers);
Assert.assertTrue(job.isSuccessful());
long endTime = Time.monotonicNow(); long endTime = Time.monotonicNow();
testSummary.append(String.format("%nJob %s ended at %s", testSummary.append(String.format("%nJob %s ended at %s",
job.getJobName(), Time.formatTime(System.currentTimeMillis()))); job.getJobName(), Time.formatTime(System.currentTimeMillis())));
testSummary.append(String.format("%n\tThe job took %.3f seconds", testSummary.append(String.format("%n\tThe job took %.3f seconds",
(1.0 * (endTime - startTime)) / 1000)); (1.0 * (endTime - startTime)) / 1000));
long spilledRecords = FileStatus[] fileStatusArr =
job.getCounters().findCounter(TaskCounter.SPILLED_RECORDS).getValue();
Assert.assertFalse(
"The encrypted spilled files should not be empty.",
spillInjector.getEncryptedSpilledFiles().isEmpty());
Assert.assertTrue("Spill records must be greater than 0",
spilledRecords > 0);
Assert.assertTrue("Job Output path [" + jobOutputPath + "] should exist",
fs.exists(jobOutputPath));
Assert.assertTrue("Invalid access to spill file positions",
spillInjector.getInvalidSpillEntries().isEmpty());
FileStatus[] fileStatus =
fs.listStatus(jobOutputPath, fs.listStatus(jobOutputPath,
new Utils.OutputFileUtils.OutputFilesFilter()); new Utils.OutputFileUtils.OutputFilesFilter());
for (FileStatus fStatus : fileStatus) { for (FileStatus fStatus : fileStatusArr) {
long fileSize = fStatus.getLen(); long fileSize = fStatus.getLen();
testSummary.append( testSummary.append(
String.format("%n\tOutput file %s: %d", String.format("%n\tOutput file %s: %d",
fStatus.getPath(), fileSize)); fStatus.getPath(), fileSize));
} }
// Validate the checksum of the output.
Assert.assertTrue(validateJobOutput());
// Check intermediate files and spilling.
long spilledRecords =
job.getCounters().findCounter(TaskCounter.SPILLED_RECORDS).getValue();
Assert.assertTrue("Spill records must be greater than 0",
spilledRecords > 0);
Assert.assertFalse("The encrypted spilled files should not be empty.",
spillInjector.getEncryptedSpilledFiles().isEmpty());
Assert.assertTrue("Invalid access to spill file positions",
spillInjector.getInvalidSpillEntries().isEmpty());
} finally { } finally {
testSummary.append(spillInjector.getSpilledFileReport()); testSummary.append(spillInjector.getSpilledFileReport());
LOG.info(testSummary.toString()); LOG.info(testSummary.toString());
@ -408,4 +513,21 @@ public class TestMRIntermediateDataEncryption {
} }
} }
} }
/**
* A Mapper that reads the output of WordCount passing it to the reducer.
* It is used to combine the output of multiple reducer jobs.
*/
public static class CombinerJobMapper
extends Mapper<Object, Text, Text, LongWritable> {
private final LongWritable sum = new LongWritable(0);
private final Text word = new Text();
public void map(Object key, Text value,
Context context) throws IOException, InterruptedException {
String[] line = value.toString().split("\\s+");
sum.set(Long.parseLong(line[1]));
word.set(line[0]);
context.write(word, sum);
}
}
} }