From b897f6834ba69d443c3acd1fab52261c00d675a1 Mon Sep 17 00:00:00 2001 From: Eric E Payne Date: Wed, 29 Jan 2020 16:54:45 +0000 Subject: [PATCH] MAPREDUCE-7079: JobHistory#ServiceStop implementation is incorrect. Contributed by Ahmed Hussein (ahussein) --- .../hadoop/mapreduce/v2/hs/JobHistory.java | 31 +- .../TestMRIntermediateDataEncryption.java | 273 +++++++++++------- 2 files changed, 184 insertions(+), 120 deletions(-) diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java index a7d1370b168..7bb4b52f844 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java @@ -143,29 +143,32 @@ protected void serviceStart() throws Exception { protected int getInitDelaySecs() { return 30; } - + @Override protected void serviceStop() throws Exception { LOG.info("Stopping JobHistory"); if (scheduledExecutor != null) { LOG.info("Stopping History Cleaner/Move To Done"); scheduledExecutor.shutdown(); - boolean interrupted = false; - long currentTime = System.currentTimeMillis(); - while (!scheduledExecutor.isShutdown() - && System.currentTimeMillis() > currentTime + 1000l && !interrupted) { - try { - Thread.sleep(20); - } catch (InterruptedException e) { - interrupted = true; + int retryCnt = 50; + try { + while (!scheduledExecutor.awaitTermination(20, + TimeUnit.MILLISECONDS)) { + if (--retryCnt == 0) { + scheduledExecutor.shutdownNow(); + break; + } + } + } catch (InterruptedException iex) { + LOG.warn("HistoryCleanerService/move to done shutdown may not have " + + "succeeded, Forcing a shutdown", iex); + if (!scheduledExecutor.isShutdown()) { + scheduledExecutor.shutdownNow(); } } - if (!scheduledExecutor.isShutdown()) { - LOG.warn("HistoryCleanerService/move to done shutdown may not have " + - "succeeded, Forcing a shutdown"); - scheduledExecutor.shutdownNow(); - } + scheduledExecutor = null; } + // Stop the other services. if (storage != null && storage instanceof Service) { ((Service) storage).stop(); } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRIntermediateDataEncryption.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRIntermediateDataEncryption.java index 28b22956732..fa8dacf6dd5 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRIntermediateDataEncryption.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRIntermediateDataEncryption.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.mapred; +import java.util.Arrays; +import java.util.Collection; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; @@ -26,12 +28,20 @@ import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.MRJobConfig; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; import org.junit.Test; import java.io.IOException; import java.io.OutputStream; import java.io.OutputStreamWriter; import java.io.Writer; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import static org.junit.Assert.*; @@ -44,85 +54,126 @@ * framework's merge on the reduce side will merge the partitions created to * generate the final output which is sorted on the key. */ +@RunWith(Parameterized.class) public class TestMRIntermediateDataEncryption { + private static final Logger LOG = + LoggerFactory.getLogger(TestMRIntermediateDataEncryption.class); + /** + * Use urandom to avoid the YarnChild process from hanging on low entropy + * systems. + */ + private static final String JVM_SECURITY_EGD_OPT = + "-Djava.security.egd=file:/dev/./urandom"; // Where MR job's input will reside. private static final Path INPUT_DIR = new Path("/test/input"); // Where output goes. private static final Path OUTPUT = new Path("/test/output"); + private static final int NUM_LINES = 1000; + private static MiniMRClientCluster mrCluster = null; + private static MiniDFSCluster dfsCluster = null; + private static FileSystem fs = null; + private static final int NUM_NODES = 2; - @Test - public void testSingleReducer() throws Exception { - doEncryptionTest(3, 1, 2, false); + private final String testTitle; + private final int numMappers; + private final int numReducers; + private final boolean isUber; + + /** + * List of arguments to run the JunitTest. + * @return + */ + @Parameterized.Parameters( + name = "{index}: TestMRIntermediateDataEncryption.{0} .. " + + "mappers:{1}, reducers:{2}, isUber:{3})") + public static Collection getTestParameters() { + return Arrays.asList(new Object[][]{ + {"testSingleReducer", 3, 1, false}, + {"testUberMode", 3, 1, true}, + {"testMultipleMapsPerNode", 8, 1, false}, + {"testMultipleReducers", 2, 4, false} + }); } - @Test - public void testUberMode() throws Exception { - doEncryptionTest(3, 1, 2, true); + /** + * Initialized the parametrized JUnit test. + * @param testName the name of the unit test to be executed. + * @param mappers number of mappers in the tests. + * @param reducers number of the reducers. + * @param uberEnabled boolean flag for isUber + */ + public TestMRIntermediateDataEncryption(String testName, int mappers, + int reducers, boolean uberEnabled) { + this.testTitle = testName; + this.numMappers = mappers; + this.numReducers = reducers; + this.isUber = uberEnabled; } - @Test - public void testMultipleMapsPerNode() throws Exception { - doEncryptionTest(8, 1, 2, false); + @BeforeClass + public static void setupClass() throws Exception { + Configuration conf = new Configuration(); + conf.setBoolean(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA, true); + + // Set the jvm arguments. + conf.set(MRJobConfig.MR_AM_ADMIN_COMMAND_OPTS, + JVM_SECURITY_EGD_OPT); + final String childJVMOpts = JVM_SECURITY_EGD_OPT + + " " + conf.get("mapred.child.java.opts", " "); + conf.set("mapred.child.java.opts", childJVMOpts); + + + // Start the mini-MR and mini-DFS clusters. + dfsCluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(NUM_NODES).build(); + mrCluster = + MiniMRClientClusterFactory.create( + TestMRIntermediateDataEncryption.class, NUM_NODES, conf); + mrCluster.start(); } - @Test - public void testMultipleReducers() throws Exception { - doEncryptionTest(2, 4, 2, false); + @AfterClass + public static void tearDown() throws IOException { + if (fs != null) { + fs.close(); + } + if (mrCluster != null) { + mrCluster.stop(); + } + if (dfsCluster != null) { + dfsCluster.shutdown(); + } } - public void doEncryptionTest(int numMappers, int numReducers, int numNodes, - boolean isUber) throws Exception { - doEncryptionTest(numMappers, numReducers, numNodes, 1000, isUber); + @Before + public void setup() throws Exception { + LOG.info("Starting TestMRIntermediateDataEncryption#{}.......", testTitle); + fs = dfsCluster.getFileSystem(); + if (fs.exists(INPUT_DIR) && !fs.delete(INPUT_DIR, true)) { + throw new IOException("Could not delete " + INPUT_DIR); + } + if (fs.exists(OUTPUT) && !fs.delete(OUTPUT, true)) { + throw new IOException("Could not delete " + OUTPUT); + } + // Generate input. + createInput(fs, numMappers, NUM_LINES); } - public void doEncryptionTest(int numMappers, int numReducers, int numNodes, - int numLines, boolean isUber) throws Exception { - MiniDFSCluster dfsCluster = null; - MiniMRClientCluster mrCluster = null; - FileSystem fileSystem = null; - try { - Configuration conf = new Configuration(); - // Start the mini-MR and mini-DFS clusters - - dfsCluster = new MiniDFSCluster.Builder(conf) - .numDataNodes(numNodes).build(); - fileSystem = dfsCluster.getFileSystem(); - mrCluster = MiniMRClientClusterFactory.create(this.getClass(), - numNodes, conf); - // Generate input. - createInput(fileSystem, numMappers, numLines); - // Run the test. - runMergeTest(new JobConf(mrCluster.getConfig()), fileSystem, - numMappers, numReducers, numLines, isUber); - } finally { - if (dfsCluster != null) { - dfsCluster.shutdown(); + @After + public void cleanup() throws IOException { + if (fs != null) { + if (fs.exists(OUTPUT)) { + fs.delete(OUTPUT, true); } - if (mrCluster != null) { - mrCluster.stop(); + if (fs.exists(INPUT_DIR)) { + fs.delete(INPUT_DIR, true); } } } - private void createInput(FileSystem fs, int numMappers, int numLines) throws Exception { - fs.delete(INPUT_DIR, true); - for (int i = 0; i < numMappers; i++) { - OutputStream os = fs.create(new Path(INPUT_DIR, "input_" + i + ".txt")); - Writer writer = new OutputStreamWriter(os); - for (int j = 0; j < numLines; j++) { - // Create sorted key, value pairs. - int k = j + 1; - String formattedNumber = String.format("%09d", k); - writer.write(formattedNumber + " " + formattedNumber + "\n"); - } - writer.close(); - } - } - - private void runMergeTest(JobConf job, FileSystem fileSystem, int - numMappers, int numReducers, int numLines, boolean isUber) - throws Exception { - fileSystem.delete(OUTPUT, true); + @Test(timeout=600000) + public void testMerge() throws Exception { + JobConf job = new JobConf(mrCluster.getConfig()); job.setJobName("Test"); JobClient client = new JobClient(job); RunningJob submittedJob = null; @@ -134,43 +185,53 @@ private void runMergeTest(JobConf job, FileSystem fileSystem, int job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); - job.setMapperClass(MyMapper.class); - job.setPartitionerClass(MyPartitioner.class); + job.setMapperClass(TestMRIntermediateDataEncryption.MyMapper.class); + job.setPartitionerClass( + TestMRIntermediateDataEncryption.MyPartitioner.class); job.setOutputFormat(TextOutputFormat.class); job.setNumReduceTasks(numReducers); - job.setInt("mapreduce.map.maxattempts", 1); job.setInt("mapreduce.reduce.maxattempts", 1); - job.setInt("mapred.test.num_lines", numLines); - if (isUber) { - job.setBoolean("mapreduce.job.ubertask.enable", true); - } + job.setInt("mapred.test.num_lines", NUM_LINES); + job.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, isUber); job.setBoolean(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA, true); - try { - submittedJob = client.submitJob(job); - try { - if (! client.monitorAndPrintJob(job, submittedJob)) { - throw new IOException("Job failed!"); - } - } catch(InterruptedException ie) { - Thread.currentThread().interrupt(); + submittedJob = client.submitJob(job); + submittedJob.waitForCompletion(); + assertTrue("The submitted job is completed", submittedJob.isComplete()); + assertTrue("The submitted job is successful", submittedJob.isSuccessful()); + verifyOutput(fs, numMappers, NUM_LINES); + client.close(); + // wait for short period to cool down. + Thread.sleep(1000); + } + + private void createInput(FileSystem filesystem, int mappers, int numLines) + throws Exception { + for (int i = 0; i < mappers; i++) { + OutputStream os = + filesystem.create(new Path(INPUT_DIR, "input_" + i + ".txt")); + Writer writer = new OutputStreamWriter(os); + for (int j = 0; j < numLines; j++) { + // Create sorted key, value pairs. + int k = j + 1; + String formattedNumber = String.format("%09d", k); + writer.write(formattedNumber + " " + formattedNumber + "\n"); } - } catch(IOException ioe) { - System.err.println("Job failed with: " + ioe); - } finally { - verifyOutput(submittedJob, fileSystem, numMappers, numLines); + writer.close(); + os.close(); } } - private void verifyOutput(RunningJob submittedJob, FileSystem fileSystem, int numMappers, int numLines) - throws Exception { + private void verifyOutput(FileSystem fileSystem, + int mappers, int numLines) + throws Exception { FSDataInputStream dis = null; long numValidRecords = 0; long numInvalidRecords = 0; String prevKeyValue = "000000000"; Path[] fileList = - FileUtil.stat2Paths(fileSystem.listStatus(OUTPUT, - new Utils.OutputFileUtils.OutputFilesFilter())); + FileUtil.stat2Paths(fileSystem.listStatus(OUTPUT, + new Utils.OutputFileUtils.OutputFilesFilter())); for (Path outFile : fileList) { try { dis = fileSystem.open(outFile); @@ -197,7 +258,7 @@ private void verifyOutput(RunningJob submittedJob, FileSystem fileSystem, int nu } } // Make sure we got all input records in the output in sorted order. - assertEquals((long)(numMappers * numLines), numValidRecords); + assertEquals((long)(mappers * numLines), numValidRecords); // Make sure there is no extraneous invalid record. assertEquals(0, numInvalidRecords); } @@ -207,30 +268,30 @@ private void verifyOutput(RunningJob submittedJob, FileSystem fileSystem, int nu * in displayable form. */ public static class MyMapper extends MapReduceBase - implements Mapper { - private Text keyText; - private Text valueText; + implements Mapper { + private Text keyText; + private Text valueText; - public MyMapper() { - keyText = new Text(); - valueText = new Text(); - } - - @Override - public void map(LongWritable key, Text value, - OutputCollector output, - Reporter reporter) throws IOException { - String record = value.toString(); - int blankPos = record.indexOf(" "); - keyText.set(record.substring(0, blankPos)); - valueText.set(record.substring(blankPos+1)); - output.collect(keyText, valueText); - } - - public void close() throws IOException { - } + public MyMapper() { + keyText = new Text(); + valueText = new Text(); } + @Override + public void map(LongWritable key, Text value, + OutputCollector output, + Reporter reporter) throws IOException { + String record = value.toString(); + int blankPos = record.indexOf(" "); + keyText.set(record.substring(0, blankPos)); + valueText.set(record.substring(blankPos + 1)); + output.collect(keyText, valueText); + } + + public void close() throws IOException { + } + } + /** * Partitioner implementation to make sure that output is in total sorted * order. We basically route key ranges to different reducers such that @@ -255,12 +316,12 @@ public int getPartition(Text key, Text value, int numPartitions) { int keyValue = 0; try { keyValue = Integer.parseInt(key.toString()); - } catch(NumberFormatException nfe) { + } catch (NumberFormatException nfe) { keyValue = 0; } - int partitionNumber = (numPartitions*(Math.max(0, keyValue-1)))/job.getInt("mapred.test.num_lines", 10000); + int partitionNumber = (numPartitions * (Math.max(0, keyValue - 1))) / job + .getInt("mapred.test.num_lines", 10000); return partitionNumber; } } - }