From 55a3a13afaacbe65c8fed4281107f71010b85e85 Mon Sep 17 00:00:00 2001 From: Vinod Kumar Vavilapalli Date: Tue, 21 Feb 2012 05:31:33 +0000 Subject: [PATCH] MAPREDUCE-3798. Fixed failing TestJobCleanup.testCusomCleanup() and moved it to the maven build. Contributed by Ravi Prakash. svn merge --ignore-ancestry -c 1291606 ../../trunk git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1291607 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 + .../hadoop/mapreduce/v2/app/MRAppMaster.java | 2 +- .../apache/hadoop/mapred/TestJobCleanup.java | 291 +++++++++--------- 3 files changed, 158 insertions(+), 138 deletions(-) rename hadoop-mapreduce-project/{src/test/mapred => hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java}/org/apache/hadoop/mapred/TestJobCleanup.java (55%) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index dc0b3e5b8eb..0f13ea9d494 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -37,6 +37,9 @@ Release 0.23.2 - UNRELEASED MAPREDUCE-3634. Fixed all daemons to crash instead of hanging around when their EventHandlers get exceptions. (vinodkv) + + MAPREDUCE-3798. Fixed failing TestJobCleanup.testCusomCleanup() and moved it + to the maven build. (Ravi Prakash via vinodkv) Release 0.23.1 - 2012-02-17 diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java index 2e17869f79c..0dddd66d59f 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java @@ -381,6 +381,7 @@ public class MRAppMaster extends CompositeService { // this is the only job, so shut down the Appmaster // note in a workflow scenario, this may lead to creation of a new // job (FIXME?) + // Send job-end notification if (getConfig().get(MRJobConfig.MR_JOB_END_NOTIFICATION_URL) != null) { try { LOG.info("Job end notification started for jobID : " @@ -407,7 +408,6 @@ public class MRAppMaster extends CompositeService { LOG.info("Calling stop for all the services"); stop(); - // Send job-end notification } catch (Throwable t) { LOG.warn("Graceful stop failed ", t); } diff --git a/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/TestJobCleanup.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestJobCleanup.java similarity index 55% rename from hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/TestJobCleanup.java rename to hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestJobCleanup.java index 3635892d639..6139fdb05f1 100644 --- a/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/TestJobCleanup.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestJobCleanup.java @@ -22,10 +22,8 @@ import java.io.DataOutputStream; import java.io.File; import java.io.IOException; -import junit.extensions.TestSetup; -import junit.framework.Test; -import junit.framework.TestCase; -import junit.framework.TestSuite; +import org.apache.commons.logging.LogFactory; +import org.apache.commons.logging.Log; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -33,64 +31,71 @@ import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.lib.IdentityMapper; import org.apache.hadoop.mapred.lib.IdentityReducer; +import org.apache.hadoop.mapreduce.JobCounter; +import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import static org.junit.Assert.*; /** * A JUnit test to test Map-Reduce job cleanup. */ -public class TestJobCleanup extends TestCase { - private static String TEST_ROOT_DIR = - new File(System.getProperty("test.build.data", "/tmp") + "/" - + "test-job-cleanup").toString(); - private static final String CUSTOM_CLEANUP_FILE_NAME = - "_custom_cleanup"; - private static final String ABORT_KILLED_FILE_NAME = - "_custom_abort_killed"; - private static final String ABORT_FAILED_FILE_NAME = - "_custom_abort_failed"; +@SuppressWarnings("deprecation") +public class TestJobCleanup { + private static String TEST_ROOT_DIR = new File(System.getProperty( + "test.build.data", "/tmp") + "/" + "test-job-cleanup").toString(); + private static final String CUSTOM_CLEANUP_FILE_NAME = "_custom_cleanup"; + private static final String ABORT_KILLED_FILE_NAME = "_custom_abort_killed"; + private static final String ABORT_FAILED_FILE_NAME = "_custom_abort_failed"; private static FileSystem fileSys = null; private static MiniMRCluster mr = null; private static Path inDir = null; private static Path emptyInDir = null; private static int outDirs = 0; - - public static Test suite() { - TestSetup setup = new TestSetup(new TestSuite(TestJobCleanup.class)) { - protected void setUp() throws Exception { - JobConf conf = new JobConf(); - fileSys = FileSystem.get(conf); - fileSys.delete(new Path(TEST_ROOT_DIR), true); - conf.set("mapred.job.tracker.handler.count", "1"); - conf.set("mapred.job.tracker", "127.0.0.1:0"); - conf.set("mapred.job.tracker.http.address", "127.0.0.1:0"); - conf.set("mapred.task.tracker.http.address", "127.0.0.1:0"); - mr = new MiniMRCluster(1, "file:///", 1, null, null, conf); - inDir = new Path(TEST_ROOT_DIR, "test-input"); - String input = "The quick brown fox\n" + "has many silly\n" - + "red fox sox\n"; - DataOutputStream file = fileSys.create(new Path(inDir, "part-" + 0)); - file.writeBytes(input); - file.close(); - emptyInDir = new Path(TEST_ROOT_DIR, "empty-input"); - fileSys.mkdirs(emptyInDir); - } - - protected void tearDown() throws Exception { - if (fileSys != null) { - fileSys.delete(new Path(TEST_ROOT_DIR), true); - fileSys.close(); - } - if (mr != null) { - mr.shutdown(); - } - } - }; - return setup; + private static Log LOG = LogFactory.getLog(TestJobCleanup.class); + + @BeforeClass + public static void setUp() throws IOException { + JobConf conf = new JobConf(); + fileSys = FileSystem.get(conf); + fileSys.delete(new Path(TEST_ROOT_DIR), true); + conf.set("mapred.job.tracker.handler.count", "1"); + conf.set("mapred.job.tracker", "127.0.0.1:0"); + conf.set("mapred.job.tracker.http.address", "127.0.0.1:0"); + conf.set("mapred.task.tracker.http.address", "127.0.0.1:0"); + conf.set(JHAdminConfig.MR_HISTORY_INTERMEDIATE_DONE_DIR, TEST_ROOT_DIR + + "/intermediate"); + conf.set(org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter + .SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, "true"); + + mr = new MiniMRCluster(1, "file:///", 1, null, null, conf); + inDir = new Path(TEST_ROOT_DIR, "test-input"); + String input = "The quick brown fox\n" + "has many silly\n" + + "red fox sox\n"; + DataOutputStream file = fileSys.create(new Path(inDir, "part-" + 0)); + file.writeBytes(input); + file.close(); + emptyInDir = new Path(TEST_ROOT_DIR, "empty-input"); + fileSys.mkdirs(emptyInDir); } - - /** - * Committer with deprecated {@link FileOutputCommitter#cleanupJob(JobContext)} - * making a _failed/_killed in the output folder + + @AfterClass + public static void tearDown() throws Exception { + if (fileSys != null) { + // fileSys.delete(new Path(TEST_ROOT_DIR), true); + fileSys.close(); + } + if (mr != null) { + mr.shutdown(); + } + } + + /** + * Committer with deprecated + * {@link FileOutputCommitter#cleanupJob(JobContext)} making a _failed/_killed + * in the output folder */ static class CommitterWithCustomDeprecatedCleanup extends FileOutputCommitter { @Override @@ -101,31 +106,40 @@ public class TestJobCleanup extends TestCase { FileSystem fs = outputPath.getFileSystem(conf); fs.create(new Path(outputPath, CUSTOM_CLEANUP_FILE_NAME)).close(); } + + @Override + public void commitJob(JobContext context) throws IOException { + cleanupJob(context); + } + + @Override + public void abortJob(JobContext context, int i) throws IOException { + cleanupJob(context); + } } - - /** + + /** * Committer with abort making a _failed/_killed in the output folder */ static class CommitterWithCustomAbort extends FileOutputCommitter { @Override - public void abortJob(JobContext context, int state) - throws IOException { - JobConf conf = context.getJobConf();; + public void abortJob(JobContext context, int state) throws IOException { + JobConf conf = context.getJobConf(); + ; Path outputPath = FileOutputFormat.getOutputPath(conf); FileSystem fs = outputPath.getFileSystem(conf); - String fileName = (state == JobStatus.FAILED) - ? TestJobCleanup.ABORT_FAILED_FILE_NAME - : TestJobCleanup.ABORT_KILLED_FILE_NAME; + String fileName = (state == JobStatus.FAILED) ? TestJobCleanup.ABORT_FAILED_FILE_NAME + : TestJobCleanup.ABORT_KILLED_FILE_NAME; fs.create(new Path(outputPath, fileName)).close(); } } - + private Path getNewOutputDir() { return new Path(TEST_ROOT_DIR, "output-" + outDirs++); } - - private void configureJob(JobConf jc, String jobName, int maps, int reds, - Path outDir) { + + private void configureJob(JobConf jc, String jobName, int maps, int reds, + Path outDir) { jc.setJobName(jobName); jc.setInputFormat(TextInputFormat.class); jc.setOutputKeyClass(LongWritable.class); @@ -137,36 +151,38 @@ public class TestJobCleanup extends TestCase { jc.setNumMapTasks(maps); jc.setNumReduceTasks(reds); } - + // run a job with 1 map and let it run to completion - private void testSuccessfulJob(String filename, - Class committer, String[] exclude) - throws IOException { + private void testSuccessfulJob(String filename, + Class committer, String[] exclude) + throws IOException { JobConf jc = mr.createJobConf(); Path outDir = getNewOutputDir(); configureJob(jc, "job with cleanup()", 1, 0, outDir); jc.setOutputCommitter(committer); - + JobClient jobClient = new JobClient(jc); RunningJob job = jobClient.submitJob(jc); JobID id = job.getID(); job.waitForCompletion(); - + + LOG.info("Job finished : " + job.isComplete()); Path testFile = new Path(outDir, filename); - assertTrue("Done file missing for job " + id, fileSys.exists(testFile)); - + assertTrue("Done file \"" + testFile + "\" missing for job " + id, + fileSys.exists(testFile)); + // check if the files from the missing set exists for (String ex : exclude) { Path file = new Path(outDir, ex); - assertFalse("File " + file + " should not be present for successful job " - + id, fileSys.exists(file)); + assertFalse("File " + file + " should not be present for successful job " + + id, fileSys.exists(file)); } } - + // run a job for which all the attempts simply fail. - private void testFailedJob(String fileName, - Class committer, String[] exclude) - throws IOException { + private void testFailedJob(String fileName, + Class committer, String[] exclude) + throws IOException { JobConf jc = mr.createJobConf(); Path outDir = getNewOutputDir(); configureJob(jc, "fail job with abort()", 1, 0, outDir); @@ -179,128 +195,129 @@ public class TestJobCleanup extends TestCase { RunningJob job = jobClient.submitJob(jc); JobID id = job.getID(); job.waitForCompletion(); - + if (fileName != null) { Path testFile = new Path(outDir, fileName); - assertTrue("File " + testFile + " missing for failed job " + id, - fileSys.exists(testFile)); + assertTrue("File " + testFile + " missing for failed job " + id, + fileSys.exists(testFile)); } - + // check if the files from the missing set exists for (String ex : exclude) { Path file = new Path(outDir, ex); assertFalse("File " + file + " should not be present for failed job " - + id, fileSys.exists(file)); + + id, fileSys.exists(file)); } } - + // run a job which gets stuck in mapper and kill it. private void testKilledJob(String fileName, - Class committer, String[] exclude) - throws IOException { + Class committer, String[] exclude) + throws IOException { JobConf jc = mr.createJobConf(); Path outDir = getNewOutputDir(); configureJob(jc, "kill job with abort()", 1, 0, outDir); // set the job to wait for long jc.setMapperClass(UtilsForTests.KillMapper.class); jc.setOutputCommitter(committer); - + JobClient jobClient = new JobClient(jc); RunningJob job = jobClient.submitJob(jc); JobID id = job.getID(); - JobInProgress jip = - mr.getJobTrackerRunner().getJobTracker().getJob(job.getID()); - + + Counters counters = job.getCounters(); + // wait for the map to be launched while (true) { - if (jip.runningMaps() == 1) { + if (counters.getCounter(JobCounter.TOTAL_LAUNCHED_MAPS) == 1) { break; } + LOG.info("Waiting for a map task to be launched"); UtilsForTests.waitFor(100); + counters = job.getCounters(); } - + job.killJob(); // kill the job - + job.waitForCompletion(); // wait for the job to complete - + if (fileName != null) { Path testFile = new Path(outDir, fileName); - assertTrue("File " + testFile + " missing for job " + id, - fileSys.exists(testFile)); + assertTrue("File " + testFile + " missing for job " + id, + fileSys.exists(testFile)); } - + // check if the files from the missing set exists for (String ex : exclude) { Path file = new Path(outDir, ex); assertFalse("File " + file + " should not be present for killed job " - + id, fileSys.exists(file)); + + id, fileSys.exists(file)); } } - + /** * Test default cleanup/abort behavior - * + * * @throws IOException */ + @Test public void testDefaultCleanupAndAbort() throws IOException { // check with a successful job testSuccessfulJob(FileOutputCommitter.SUCCEEDED_FILE_NAME, - FileOutputCommitter.class, - new String[] {}); - + FileOutputCommitter.class, new String[] {}); + // check with a failed job - testFailedJob(null, - FileOutputCommitter.class, - new String[] {FileOutputCommitter.SUCCEEDED_FILE_NAME}); - + testFailedJob(null, FileOutputCommitter.class, + new String[] { FileOutputCommitter.SUCCEEDED_FILE_NAME }); + // check default abort job kill - testKilledJob(null, - FileOutputCommitter.class, - new String[] {FileOutputCommitter.SUCCEEDED_FILE_NAME}); + testKilledJob(null, FileOutputCommitter.class, + new String[] { FileOutputCommitter.SUCCEEDED_FILE_NAME }); } - + /** * Test if a failed job with custom committer runs the abort code. - * + * * @throws IOException */ + @Test public void testCustomAbort() throws IOException { // check with a successful job - testSuccessfulJob(FileOutputCommitter.SUCCEEDED_FILE_NAME, - CommitterWithCustomAbort.class, - new String[] {ABORT_FAILED_FILE_NAME, - ABORT_KILLED_FILE_NAME}); - + testSuccessfulJob(FileOutputCommitter.SUCCEEDED_FILE_NAME, + CommitterWithCustomAbort.class, new String[] { ABORT_FAILED_FILE_NAME, + ABORT_KILLED_FILE_NAME }); + // check with a failed job - testFailedJob(ABORT_FAILED_FILE_NAME, CommitterWithCustomAbort.class, - new String[] {FileOutputCommitter.SUCCEEDED_FILE_NAME, - ABORT_KILLED_FILE_NAME}); - + testFailedJob(ABORT_FAILED_FILE_NAME, CommitterWithCustomAbort.class, + new String[] { FileOutputCommitter.SUCCEEDED_FILE_NAME, + ABORT_KILLED_FILE_NAME }); + // check with a killed job - testKilledJob(ABORT_KILLED_FILE_NAME, CommitterWithCustomAbort.class, - new String[] {FileOutputCommitter.SUCCEEDED_FILE_NAME, - ABORT_FAILED_FILE_NAME}); + testKilledJob(ABORT_KILLED_FILE_NAME, CommitterWithCustomAbort.class, + new String[] { FileOutputCommitter.SUCCEEDED_FILE_NAME, + ABORT_FAILED_FILE_NAME }); } /** * Test if a failed job with custom committer runs the deprecated - * {@link FileOutputCommitter#cleanupJob(JobContext)} code for api + * {@link FileOutputCommitter#cleanupJob(JobContext)} code for api * compatibility testing. */ + @Test public void testCustomCleanup() throws IOException { // check with a successful job - testSuccessfulJob(CUSTOM_CLEANUP_FILE_NAME, - CommitterWithCustomDeprecatedCleanup.class, - new String[] {}); - - // check with a failed job - testFailedJob(CUSTOM_CLEANUP_FILE_NAME, - CommitterWithCustomDeprecatedCleanup.class, - new String[] {FileOutputCommitter.SUCCEEDED_FILE_NAME}); - - // check with a killed job - testKilledJob(TestJobCleanup.CUSTOM_CLEANUP_FILE_NAME, - CommitterWithCustomDeprecatedCleanup.class, - new String[] {FileOutputCommitter.SUCCEEDED_FILE_NAME}); + testSuccessfulJob(CUSTOM_CLEANUP_FILE_NAME, + CommitterWithCustomDeprecatedCleanup.class, + new String[] {}); + + // check with a failed job + testFailedJob(CUSTOM_CLEANUP_FILE_NAME, + CommitterWithCustomDeprecatedCleanup.class, + new String[] {FileOutputCommitter.SUCCEEDED_FILE_NAME}); + + // check with a killed job + testKilledJob(TestJobCleanup.CUSTOM_CLEANUP_FILE_NAME, + CommitterWithCustomDeprecatedCleanup.class, + new String[] {FileOutputCommitter.SUCCEEDED_FILE_NAME}); } }