MAPREDUCE-4355. Add JobStatus getJobStatus(JobID) to JobClient. (kkambatl via tucu)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1353757 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Alejandro Abdelnur 2012-06-25 21:55:44 +00:00
parent 5b36106934
commit d9bbd5997d
4 changed files with 70 additions and 16 deletions

View File

@ -129,6 +129,8 @@ Branch-2 ( Unreleased changes )
NEW FEATURES NEW FEATURES
MAPREDUCE-4355. Add JobStatus getJobStatus(JobID) to JobClient. (kkambatl via tucu)
IMPROVEMENTS IMPROVEMENTS
MAPREDUCE-4146. Support limits on task status string length and number of MAPREDUCE-4146. Support limits on task status string length and number of

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.mapred; package org.apache.hadoop.mapred;
import static junit.framework.Assert.assertNotNull; import static junit.framework.Assert.assertNotNull;
import static org.junit.Assert.assertEquals;
import java.io.IOException; import java.io.IOException;
@ -28,7 +29,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.junit.Test; import org.junit.Test;
public class TestJobClientGetJob { public class TestJobClient {
private static Path TEST_ROOT_DIR = private static Path TEST_ROOT_DIR =
new Path(System.getProperty("test.build.data","/tmp")); new Path(System.getProperty("test.build.data","/tmp"));
@ -45,7 +46,7 @@ public class TestJobClientGetJob {
@SuppressWarnings("deprecation") @SuppressWarnings("deprecation")
@Test @Test
public void testGetRunningJobFromJobClient() throws Exception { public void testGetRunningJob() throws Exception {
JobConf conf = new JobConf(); JobConf conf = new JobConf();
conf.set("mapreduce.framework.name", "local"); conf.set("mapreduce.framework.name", "local");
FileInputFormat.addInputPath(conf, createTempFile("in", "hello")); FileInputFormat.addInputPath(conf, createTempFile("in", "hello"));
@ -60,4 +61,21 @@ public class TestJobClientGetJob {
assertNotNull("New running job", newRunningJob); assertNotNull("New running job", newRunningJob);
} }
@SuppressWarnings("deprecation")
@Test
public void testGetJobStatus() throws Exception {
JobConf conf = new JobConf();
conf.set("mapreduce.framework.name", "local");
FileInputFormat.addInputPath(conf, createTempFile("in", "hello"));
Path outputDir = new Path(TEST_ROOT_DIR, getClass().getSimpleName());
outputDir.getFileSystem(conf).delete(outputDir, true);
FileOutputFormat.setOutputPath(conf, outputDir);
JobClient jc = new JobClient(conf);
RunningJob runningJob = jc.submitJob(conf);
assertNotNull("Running job", runningJob);
JobID jobid = runningJob.getID();
JobStatus jobStatus = jc.getJobStatus(jobid);
assertNotNull("New running job", jobStatus);
assertEquals("Equal JobIDs", jobid, jobStatus.getJobID());
}
} }

View File

@ -620,6 +620,15 @@ public class JobClient extends CLI {
} }
} }
private JobStatus getJobStatusUsingCluster(final JobID jobId)
throws IOException, InterruptedException {
return clientUgi.doAs(new PrivilegedExceptionAction<JobStatus>() {
public JobStatus run() throws IOException, InterruptedException {
return JobStatus.downgrade(cluster.getJobStatus(jobId));
}
});
}
private Job getJobUsingCluster(final JobID jobid) throws IOException, private Job getJobUsingCluster(final JobID jobid) throws IOException,
InterruptedException { InterruptedException {
return clientUgi.doAs(new PrivilegedExceptionAction<Job>() { return clientUgi.doAs(new PrivilegedExceptionAction<Job>() {
@ -628,28 +637,40 @@ public class JobClient extends CLI {
} }
}); });
} }
/** /**
* Get an {@link RunningJob} object to track an ongoing job. Returns * Get {@link JobStatus} of a job. Returns null if the id does not correspond
* null if the id does not correspond to any known job. * to any known job.
* *
* @param jobid the jobid of the job. * @param jobid
* @return the {@link RunningJob} handle to track the job, null if the * the jobid of the job.
* @return the {@link JobStatus} object to retrieve the job stats, null if the
* <code>jobid</code> doesn't correspond to any known job. * <code>jobid</code> doesn't correspond to any known job.
* @throws IOException * @throws IOException
*/ */
public RunningJob getJob(final JobID jobid) throws IOException { public JobStatus getJobStatus(JobID jobId) throws IOException {
try { try {
return getJobStatusUsingCluster(jobId);
Job job = getJobUsingCluster(jobid);
if (job != null) {
JobStatus status = JobStatus.downgrade(job.getStatus());
if (status != null) {
return new NetworkedJob(status, cluster);
}
}
} catch (InterruptedException ie) { } catch (InterruptedException ie) {
throw new IOException(ie); throw new IOException(ie);
} }
}
/**
* Get an {@link RunningJob} object to track an ongoing job. Returns null if
* the id does not correspond to any known job.
*
* @param jobid
* the jobid of the job.
* @return the {@link RunningJob} handle to track the job, null if the
* <code>jobid</code> doesn't correspond to any known job.
* @throws IOException
*/
public RunningJob getJob(JobID jobId) throws IOException {
JobStatus status = getJobStatus(jobId);
if (status != null) {
return new NetworkedJob(status, cluster);
}
return null; return null;
} }

View File

@ -172,6 +172,19 @@ public class Cluster {
return fs; return fs;
} }
/**
* Get JobStatus corresponding to jobId.
*
* @param jobId
* @return object of {@link JobStatus}
* @throws IOException
* @throws InterruptedException
*/
public JobStatus getJobStatus(JobID jobId) throws IOException,
InterruptedException {
return client.getJobStatus(jobId);
}
/** /**
* Get job corresponding to jobid. * Get job corresponding to jobid.
* *
@ -181,7 +194,7 @@ public class Cluster {
* @throws InterruptedException * @throws InterruptedException
*/ */
public Job getJob(JobID jobId) throws IOException, InterruptedException { public Job getJob(JobID jobId) throws IOException, InterruptedException {
JobStatus status = client.getJobStatus(jobId); JobStatus status = getJobStatus(jobId);
if (status != null) { if (status != null) {
return Job.getInstance(this, status, new JobConf(status.getJobFile())); return Job.getInstance(this, status, new JobConf(status.getJobFile()));
} }