diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index b916907172a..ad0a49f7697 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -20,6 +20,8 @@ Release 2.0.1-alpha - UNRELEASED MAPREDUCE-4253. Tests for mapreduce-client-core are lying under mapreduce-client-jobclient (Tsuyoshi Ozawa via harsh) + MAPREDUCE-4355. Add RunningJob.getJobStatus() (kkambatl via tucu) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobClient.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobClient.java index 1e3f952303e..7e49fa77cfe 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobClient.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobClient.java @@ -441,6 +441,14 @@ public String getFailureInfo() throws IOException { } } + @Override + public JobStatus getJobStatus() throws IOException { + try { + return JobStatus.downgrade(job.getStatus()); + } catch (InterruptedException ie) { + throw new IOException(ie); + } + } } /** diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/RunningJob.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/RunningJob.java index 5a11fa876ea..5873ace4de6 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/RunningJob.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/RunningJob.java @@ -149,8 +149,16 @@ public interface RunningJob { public int getJobState() throws IOException; /** - * Kill the running job. Blocks until all job tasks have been - * killed as well. If the job is no longer running, it simply returns. + * Returns a snapshot of the current status, {@link JobStatus}, of the Job. + * Need to call again for latest information. + * + * @throws IOException + */ + public JobStatus getJobStatus() throws IOException; + + /** + * Kill the running job. Blocks until all job tasks have been killed as well. + * If the job is no longer running, it simply returns. * * @throws IOException */ diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestNetworkedJob.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestNetworkedJob.java index b6565e28956..37aa7b24a3c 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestNetworkedJob.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestNetworkedJob.java @@ -18,16 +18,31 @@ package org.apache.hadoop.mapred; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; -import java.util.List; +import java.io.File; +import java.io.IOException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.lib.IdentityMapper; +import org.apache.hadoop.mapred.lib.IdentityReducer; import org.apache.hadoop.mapreduce.Job; import org.junit.Test; -import static org.mockito.Mockito.*; public class TestNetworkedJob { + private static String TEST_ROOT_DIR = new File(System.getProperty( + "test.build.data", "/tmp")).toURI().toString().replace(' ', '+'); + private static Path testDir = new Path(TEST_ROOT_DIR + "/test_mini_mr_local"); + private static Path inFile = new Path(testDir, "in"); + private static Path outDir = new Path(testDir, "out"); @SuppressWarnings("deprecation") @Test @@ -41,4 +56,53 @@ public void testGetNullCounters() throws Exception { //verification verify(mockJob).getCounters(); } + + @Test + public void testGetJobStatus() throws IOException, InterruptedException, + ClassNotFoundException { + MiniMRClientCluster mr = null; + FileSystem fileSys = null; + + try { + mr = MiniMRClientClusterFactory.create(this.getClass(), 2, + new Configuration()); + + JobConf job = new JobConf(mr.getConfig()); + + fileSys = FileSystem.get(job); + fileSys.delete(testDir, true); + FSDataOutputStream out = fileSys.create(inFile, true); + out.writeBytes("This is a test file"); + out.close(); + + FileInputFormat.setInputPaths(job, inFile); + FileOutputFormat.setOutputPath(job, outDir); + + job.setInputFormat(TextInputFormat.class); + job.setOutputFormat(TextOutputFormat.class); + + job.setMapperClass(IdentityMapper.class); + job.setReducerClass(IdentityReducer.class); + job.setNumReduceTasks(0); + + JobClient client = new JobClient(mr.getConfig()); + RunningJob rj = client.submitJob(job); + JobID jobId = rj.getID(); + + // The following asserts read JobStatus twice and ensure the returned + // JobStatus objects correspond to the same Job. + assertEquals("Expected matching JobIDs", jobId, client.getJob(jobId) + .getJobStatus().getJobID()); + assertEquals("Expected matching startTimes", rj.getJobStatus() + .getStartTime(), client.getJob(jobId).getJobStatus() + .getStartTime()); + } finally { + if (fileSys != null) { + fileSys.delete(testDir, true); + } + if (mr != null) { + mr.stop(); + } + } + } }