MAPREDUCE-5233. Add methods that are changed or removed from JobControl.Job when compared to 1.x. This breaks 0.23.x users of one API in Job. Contributed by Mayank Bansal.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1485491 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
132e38ef47
commit
9b284150f1
|
@ -183,6 +183,10 @@ Release 2.0.5-beta - UNRELEASED
|
||||||
1.x examples jar on top of YARN. This change breaks 0.23.x direct usages of
|
1.x examples jar on top of YARN. This change breaks 0.23.x direct usages of
|
||||||
ProgramDriver. (Zhijie Shen via vinodkv)
|
ProgramDriver. (Zhijie Shen via vinodkv)
|
||||||
|
|
||||||
|
MAPREDUCE-5233. Add methods that are changed or removed from JobControl.Job
|
||||||
|
when compared to 1.x. This breaks 0.23.x users of one API in Job. (Mayank
|
||||||
|
Bansal via vinodkv)
|
||||||
|
|
||||||
NEW FEATURES
|
NEW FEATURES
|
||||||
|
|
||||||
IMPROVEMENTS
|
IMPROVEMENTS
|
||||||
|
|
|
@ -60,12 +60,11 @@ public class Job extends ControlledJob {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return the mapred ID of this job as assigned by the
|
* @return the mapred ID of this job as assigned by the mapred framework.
|
||||||
* mapred framework.
|
|
||||||
*/
|
*/
|
||||||
public JobID getAssignedJobID() {
|
public JobID getAssignedJobID() {
|
||||||
org.apache.hadoop.mapreduce.JobID temp = super.getMapredJobID();
|
org.apache.hadoop.mapreduce.JobID temp = super.getMapredJobId();
|
||||||
if(temp == null) {
|
if (temp == null) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
return JobID.downgrade(temp);
|
return JobID.downgrade(temp);
|
||||||
|
@ -126,6 +125,30 @@ public class Job extends ControlledJob {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This is a no-op function, Its a behavior change from 1.x We no more can
|
||||||
|
* change the state from job
|
||||||
|
*
|
||||||
|
* @param state
|
||||||
|
* the new state for this job.
|
||||||
|
*/
|
||||||
|
@Deprecated
|
||||||
|
protected synchronized void setState(int state) {
|
||||||
|
// No-Op, we dont want to change the sate
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Add a job to this jobs' dependency list.
|
||||||
|
* Dependent jobs can only be added while a Job
|
||||||
|
* is waiting to run, not during or afterwards.
|
||||||
|
*
|
||||||
|
* @param dependingJob Job that this Job depends on.
|
||||||
|
* @return <tt>true</tt> if the Job was added.
|
||||||
|
*/
|
||||||
|
public synchronized boolean addDependingJob(Job dependingJob) {
|
||||||
|
return super.addDependingJob(dependingJob);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return the job client of this job
|
* @return the job client of this job
|
||||||
*/
|
*/
|
||||||
|
@ -144,4 +167,25 @@ public class Job extends ControlledJob {
|
||||||
return JobControl.castToJobList(super.getDependentJobs());
|
return JobControl.castToJobList(super.getDependentJobs());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return the mapred ID of this job as assigned by the mapred framework.
|
||||||
|
*/
|
||||||
|
public synchronized String getMapredJobID() {
|
||||||
|
if (super.getMapredJobId() != null) {
|
||||||
|
return super.getMapredJobId().toString();
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This is no-op method for backward compatibility. It's a behavior change
|
||||||
|
* from 1.x, we can not change job ids from job.
|
||||||
|
*
|
||||||
|
* @param mapredJobID
|
||||||
|
* the mapred job ID for this job.
|
||||||
|
*/
|
||||||
|
@Deprecated
|
||||||
|
public synchronized void setMapredJobID(String mapredJobID) {
|
||||||
|
setAssignedJobID(JobID.forName(mapredJobID));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -140,10 +140,9 @@ public class ControlledJob {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return the mapred ID of this job as assigned by the
|
* @return the mapred ID of this job as assigned by the mapred framework.
|
||||||
* mapred framework.
|
|
||||||
*/
|
*/
|
||||||
public JobID getMapredJobID() {
|
public synchronized JobID getMapredJobId() {
|
||||||
return this.job.getJobID();
|
return this.job.getJobID();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -22,11 +22,14 @@ import static org.mockito.Mockito.*;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
|
||||||
|
import junit.framework.Assert;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.mapred.JobConf;
|
import org.apache.hadoop.mapred.JobConf;
|
||||||
import org.apache.hadoop.mapred.JobID;
|
import org.apache.hadoop.mapred.JobID;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This class performs unit test for Job/JobControl classes.
|
* This class performs unit test for Job/JobControl classes.
|
||||||
|
@ -191,10 +194,68 @@ public class TestJobControl extends junit.framework.TestCase {
|
||||||
theControl.stop();
|
theControl.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("deprecation")
|
||||||
|
@Test(timeout = 30000)
|
||||||
|
public void testJobState() throws Exception {
|
||||||
|
Job job_1 = getCopyJob();
|
||||||
|
JobControl jc = new JobControl("Test");
|
||||||
|
jc.addJob(job_1);
|
||||||
|
Assert.assertEquals(Job.WAITING, job_1.getState());
|
||||||
|
job_1.setState(Job.SUCCESS);
|
||||||
|
Assert.assertEquals(Job.WAITING, job_1.getState());
|
||||||
|
|
||||||
|
org.apache.hadoop.mapreduce.Job mockjob =
|
||||||
|
mock(org.apache.hadoop.mapreduce.Job.class);
|
||||||
|
org.apache.hadoop.mapreduce.JobID jid =
|
||||||
|
new org.apache.hadoop.mapreduce.JobID("test", 0);
|
||||||
|
when(mockjob.getJobID()).thenReturn(jid);
|
||||||
|
job_1.setJob(mockjob);
|
||||||
|
Assert.assertEquals("job_test_0000", job_1.getMapredJobID());
|
||||||
|
job_1.setMapredJobID("job_test_0001");
|
||||||
|
Assert.assertEquals("job_test_0000", job_1.getMapredJobID());
|
||||||
|
jc.stop();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 30000)
|
||||||
|
public void testAddingDependingJob() throws Exception {
|
||||||
|
Job job_1 = getCopyJob();
|
||||||
|
ArrayList<Job> dependingJobs = new ArrayList<Job>();
|
||||||
|
JobControl jc = new JobControl("Test");
|
||||||
|
jc.addJob(job_1);
|
||||||
|
Assert.assertEquals(Job.WAITING, job_1.getState());
|
||||||
|
Assert.assertTrue(job_1.addDependingJob(new Job(job_1.getJobConf(),
|
||||||
|
dependingJobs)));
|
||||||
|
}
|
||||||
|
|
||||||
|
public Job getCopyJob() throws Exception {
|
||||||
|
Configuration defaults = new Configuration();
|
||||||
|
FileSystem fs = FileSystem.get(defaults);
|
||||||
|
Path rootDataDir =
|
||||||
|
new Path(System.getProperty("test.build.data", "."),
|
||||||
|
"TestJobControlData");
|
||||||
|
Path indir = new Path(rootDataDir, "indir");
|
||||||
|
Path outdir_1 = new Path(rootDataDir, "outdir_1");
|
||||||
|
|
||||||
|
JobControlTestUtils.cleanData(fs, indir);
|
||||||
|
JobControlTestUtils.generateData(fs, indir);
|
||||||
|
|
||||||
|
JobControlTestUtils.cleanData(fs, outdir_1);
|
||||||
|
|
||||||
|
ArrayList<Job> dependingJobs = null;
|
||||||
|
|
||||||
|
ArrayList<Path> inPaths_1 = new ArrayList<Path>();
|
||||||
|
inPaths_1.add(indir);
|
||||||
|
JobConf jobConf_1 = JobControlTestUtils.createCopyJob(inPaths_1, outdir_1);
|
||||||
|
Job job_1 = new Job(jobConf_1, dependingJobs);
|
||||||
|
return job_1;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test (timeout = 30000)
|
||||||
public void testJobControl() throws Exception {
|
public void testJobControl() throws Exception {
|
||||||
doJobControlTest();
|
doJobControlTest();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test (timeout = 30000)
|
||||||
public void testGetAssignedJobId() throws Exception {
|
public void testGetAssignedJobId() throws Exception {
|
||||||
JobConf jc = new JobConf();
|
JobConf jc = new JobConf();
|
||||||
Job j = new Job(jc);
|
Job j = new Job(jc);
|
||||||
|
|
|
@ -22,12 +22,15 @@ import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
|
import junit.framework.Assert;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.mapred.HadoopTestCase;
|
import org.apache.hadoop.mapred.HadoopTestCase;
|
||||||
import org.apache.hadoop.mapreduce.Job;
|
import org.apache.hadoop.mapreduce.Job;
|
||||||
import org.apache.hadoop.mapreduce.MapReduceTestUtil;
|
import org.apache.hadoop.mapreduce.MapReduceTestUtil;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This class performs unit test for Job/JobControl classes.
|
* This class performs unit test for Job/JobControl classes.
|
||||||
|
@ -187,4 +190,20 @@ public class TestMapReduceJobControl extends HadoopTestCase {
|
||||||
|
|
||||||
theControl.stop();
|
theControl.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 30000)
|
||||||
|
public void testControlledJob() throws Exception {
|
||||||
|
Configuration conf = createJobConf();
|
||||||
|
cleanupData(conf);
|
||||||
|
Job job1 = MapReduceTestUtil.createCopyJob(conf, outdir_1, indir);
|
||||||
|
createDependencies(conf, job1);
|
||||||
|
while (cjob1.getJobState() != ControlledJob.State.RUNNING) {
|
||||||
|
try {
|
||||||
|
Thread.sleep(100);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Assert.assertNotNull(cjob1.getMapredJobId());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue