MAPREDUCE-1788. o.a.h.mapreduce.Job shouldn't make a copy of the JobConf. (Arun Murthy via mahadev)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1172171 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Mahadev Konar 2011-09-18 02:50:30 +00:00
parent dafa8f7a77
commit e1acb1222d
18 changed files with 145 additions and 64 deletions

View File

@ -1352,6 +1352,9 @@ Release 0.22.0 - Unreleased
MAPREDUCE-2994. Fixed a bug in ApplicationID parsing that affects RM
UI. (Devaraj K via vinodkv)
MAPREDUCE-1788. o.a.h.mapreduce.Job shouldn't make a copy of the JobConf.
(Arun Murthy via mahadev)
NEW FEATURES
MAPREDUCE-1804. Stress-test tool for HDFS introduced in HDFS-708.

View File

@ -535,7 +535,7 @@ public class JobClient extends CLI {
try {
conf.setBooleanIfUnset("mapred.mapper.new-api", false);
conf.setBooleanIfUnset("mapred.reducer.new-api", false);
Job job = Job.getInstance(cluster, conf);
Job job = Job.getInstance(conf);
job.submit();
return new NetworkedJob(job);
} catch (InterruptedException ie) {

View File

@ -112,7 +112,7 @@ public class Cluster {
private Job[] getJobs(JobStatus[] stats) throws IOException {
List<Job> jobs = new ArrayList<Job>();
for (JobStatus stat : stats) {
jobs.add(new Job(this, stat, new JobConf(stat.getJobFile())));
jobs.add(Job.getInstance(this, stat, new JobConf(stat.getJobFile())));
}
return jobs.toArray(new Job[0]);
}
@ -152,7 +152,7 @@ public class Cluster {
public Job getJob(JobID jobId) throws IOException, InterruptedException {
JobStatus status = client.getJobStatus(jobId);
if (status != null) {
return new Job(this, status, new JobConf(status.getJobFile()));
return Job.getInstance(this, status, new JobConf(status.getJobFile()));
}
return null;
}

View File

@ -31,22 +31,22 @@ import java.net.URLConnection;
import java.net.URI;
import java.security.PrivilegedExceptionAction;
import javax.security.auth.login.LoginException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configuration.IntegerRanges;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.filecache.DistributedCache;
import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
import org.apache.hadoop.mapreduce.task.JobContextImpl;
import org.apache.hadoop.mapreduce.util.ConfigUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.StringUtils;
/**
@ -130,7 +130,7 @@ public class Job extends JobContextImpl implements JobContext {
@Deprecated
public Job(Configuration conf) throws IOException {
this(new Cluster(conf), conf);
this(new JobConf(conf));
}
@Deprecated
@ -139,18 +139,13 @@ public class Job extends JobContextImpl implements JobContext {
setJobName(jobName);
}
Job(Cluster cluster) throws IOException {
this(cluster, new Configuration());
}
Job(Cluster cluster, Configuration conf) throws IOException {
Job(JobConf conf) throws IOException {
super(conf, null);
this.cluster = cluster;
this.cluster = null;
}
Job(Cluster cluster, JobStatus status,
Configuration conf) throws IOException {
this(cluster, conf);
Job(JobStatus status, JobConf conf) throws IOException {
this(conf);
setJobID(status.getJobID());
this.status = status;
state = JobState.RUNNING;
@ -170,7 +165,13 @@ public class Job extends JobContextImpl implements JobContext {
}
/**
* Creates a new {@link Job} with no particular {@link Cluster} .
* Creates a new {@link Job} with no particular {@link Cluster} and a
* given {@link Configuration}.
*
* The <code>Job</code> makes a copy of the <code>Configuration</code> so
* that any necessary internal modifications do not reflect on the incoming
* parameter.
*
* A Cluster will be created from the conf parameter only when it's needed.
*
* @param conf the configuration
@ -179,13 +180,18 @@ public class Job extends JobContextImpl implements JobContext {
*/
public static Job getInstance(Configuration conf) throws IOException {
// create with a null Cluster
return new Job(null, conf);
JobConf jobConf = new JobConf(conf);
return new Job(jobConf);
}
/**
* Creates a new {@link Job} with no particular {@link Cluster} and a given jobName.
* A Cluster will be created from the conf parameter only when it's needed.
*
* The <code>Job</code> makes a copy of the <code>Configuration</code> so
* that any necessary internal modifications do not reflect on the incoming
* parameter.
*
* @param conf the configuration
* @return the {@link Job} , with no connection to a cluster yet.
@ -194,25 +200,92 @@ public class Job extends JobContextImpl implements JobContext {
public static Job getInstance(Configuration conf, String jobName)
throws IOException {
// create with a null Cluster
Job result = new Job(null, conf);
Job result = getInstance(conf);
result.setJobName(jobName);
return result;
}
public static Job getInstance(Cluster cluster) throws IOException {
return new Job(cluster);
/**
* Creates a new {@link Job} with no particular {@link Cluster} and given
* {@link Configuration} and {@link JobStatus}.
* A Cluster will be created from the conf parameter only when it's needed.
*
* The <code>Job</code> makes a copy of the <code>Configuration</code> so
* that any necessary internal modifications do not reflect on the incoming
* parameter.
*
* @param status job status
* @param conf job configuration
* @return the {@link Job} , with no connection to a cluster yet.
* @throws IOException
*/
public static Job getInstance(JobStatus status, Configuration conf)
throws IOException {
return new Job(status, new JobConf(conf));
}
/**
* Creates a new {@link Job} with no particular {@link Cluster}.
* A Cluster will be created from the conf parameter only when it's needed.
*
* The <code>Job</code> makes a copy of the <code>Configuration</code> so
* that any necessary internal modifications do not reflect on the incoming
* parameter.
*
* @param ignored
* @return the {@link Job} , with no connection to a cluster yet.
* @throws IOException
* @deprecated Use {@link #getInstance()}
*/
@Deprecated
public static Job getInstance(Cluster ignored) throws IOException {
return getInstance();
}
public static Job getInstance(Cluster cluster, Configuration conf)
/**
* Creates a new {@link Job} with no particular {@link Cluster} and given
* {@link Configuration}.
* A Cluster will be created from the conf parameter only when it's needed.
*
* The <code>Job</code> makes a copy of the <code>Configuration</code> so
* that any necessary internal modifications do not reflect on the incoming
* parameter.
*
* @param ignored
* @param conf job configuration
* @return the {@link Job} , with no connection to a cluster yet.
* @throws IOException
* @deprecated Use {@link #getInstance(Configuration)}
*/
@Deprecated
public static Job getInstance(Cluster ignored, Configuration conf)
throws IOException {
return new Job(cluster, conf);
return getInstance(conf);
}
/**
* Creates a new {@link Job} with no particular {@link Cluster} and given
* {@link Configuration} and {@link JobStatus}.
* A Cluster will be created from the conf parameter only when it's needed.
*
* The <code>Job</code> makes a copy of the <code>Configuration</code> so
* that any necessary internal modifications do not reflect on the incoming
* parameter.
*
* @param cluster cluster
* @param status job status
* @param conf job configuration
* @return the {@link Job} , with no connection to a cluster yet.
* @throws IOException
*/
@Private
public static Job getInstance(Cluster cluster, JobStatus status,
Configuration conf) throws IOException {
return new Job(cluster, status, conf);
Job job = getInstance(status, conf);
job.setCluster(cluster);
return job;
}
private void ensureState(JobState state) throws IllegalStateException {
if (state != this.state) {
throw new IllegalStateException("Job in state "+ this.state +
@ -254,6 +327,10 @@ public class Job extends JobContextImpl implements JobContext {
updateStatus();
return status;
}
private void setStatus(JobStatus status) {
this.status = status;
}
/**
* Returns the current state of the Job.
@ -354,6 +431,12 @@ public class Job extends JobContextImpl implements JobContext {
return status.isRetired();
}
/** Only for mocks in unit tests. */
@Private
private void setCluster(Cluster cluster) {
this.cluster = cluster;
}
/**
* Dump stats to screen.
*/
@ -1055,6 +1138,12 @@ public class Job extends JobContextImpl implements JobContext {
return cluster != null;
}
/** Only for mocking via unit tests. */
@Private
public JobSubmitter getJobSubmitter(FileSystem fs,
ClientProtocol submitClient) throws IOException {
return new JobSubmitter(fs, submitClient);
}
/**
* Submit the job to the cluster and return immediately.
* @throws IOException
@ -1064,8 +1153,8 @@ public class Job extends JobContextImpl implements JobContext {
ensureState(JobState.DEFINE);
setUseNewAPI();
connect();
final JobSubmitter submitter = new JobSubmitter(cluster.getFileSystem(),
cluster.getClient());
final JobSubmitter submitter =
getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
public JobStatus run() throws IOException, InterruptedException,
ClassNotFoundException {
@ -1114,7 +1203,7 @@ public class Job extends JobContextImpl implements JobContext {
throws IOException, InterruptedException {
String lastReport = null;
Job.TaskStatusFilter filter;
Configuration clientConf = cluster.getConf();
Configuration clientConf = getConfiguration();
filter = Job.getTaskOutputFilter(clientConf);
JobID jobId = getJobID();
LOG.info("Running job: " + jobId);

View File

@ -319,7 +319,6 @@ class JobSubmitter {
* @throws InterruptedException
* @throws IOException
*/
@SuppressWarnings("unchecked")
JobStatus submitJobInternal(Job job, Cluster cluster)
throws ClassNotFoundException, InterruptedException, IOException {

View File

@ -27,6 +27,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configuration.IntegerRanges;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
@ -60,7 +61,11 @@ public class JobContextImpl implements JobContext {
protected final Credentials credentials;
public JobContextImpl(Configuration conf, JobID jobId) {
this.conf = new org.apache.hadoop.mapred.JobConf(conf);
if (conf instanceof JobConf) {
this.conf = (JobConf)conf;
} else {
this.conf = new JobConf(conf);
}
this.jobId = jobId;
this.credentials = this.conf.getCredentials();
try {

View File

@ -215,7 +215,7 @@ public class CLI extends Configured implements Tool {
// Submit the request
try {
if (submitJobFile != null) {
Job job = Job.getInstance(cluster, new JobConf(submitJobFile));
Job job = Job.getInstance(new JobConf(submitJobFile));
job.submit();
System.out.println("Created job " + job.getJobID());
exitCode = 0;

View File

@ -64,7 +64,7 @@ public class TestJobMonitorAndPrint extends TestCase {
when(cluster.getClient()).thenReturn(clientProtocol);
JobStatus jobStatus = new JobStatus(new JobID("job_000", 1), 0f, 0f, 0f, 0f,
State.RUNNING, JobPriority.HIGH, "tmp-user", "tmp-jobname", "tmp-jobfile", "tmp-url");
job = new Job(cluster, jobStatus, conf);
job = Job.getInstance(cluster, jobStatus, conf);
job = spy(job);
}

View File

@ -223,23 +223,10 @@ public class YARNRunner implements ClientProtocol {
throw new YarnException(e);
}
// XXX Remove
Path submitJobDir = new Path(jobSubmitDir);
FileContext defaultFS = FileContext.getFileContext(conf);
Path submitJobFile =
defaultFS.makeQualified(JobSubmissionFiles.getJobConfPath(submitJobDir));
FSDataInputStream in = defaultFS.open(submitJobFile);
conf.addResource(in);
// ---
// Construct necessary information to start the MR AM
ApplicationSubmissionContext appContext =
createApplicationSubmissionContext(conf, jobSubmitDir, ts);
// XXX Remove
in.close();
// ---
// Submit to ResourceManager
ApplicationId applicationId = resMgrDelegate.submitApplication(appContext);

View File

@ -139,8 +139,8 @@ public class TestClientRedirect {
Cluster cluster = new Cluster(conf);
org.apache.hadoop.mapreduce.JobID jobID =
new org.apache.hadoop.mapred.JobID("201103121733", 1);
org.apache.hadoop.mapreduce.Counters counters = cluster.getJob(jobID)
.getCounters();
org.apache.hadoop.mapreduce.Counters counters =
cluster.getJob(jobID).getCounters();
validateCounters(counters);
Assert.assertTrue(amContact);

View File

@ -74,7 +74,7 @@ public class TeraChecksum extends Configured implements Tool {
}
public int run(String[] args) throws Exception {
Job job = Job.getInstance(new Cluster(getConf()), getConf());
Job job = Job.getInstance(getConf());
if (args.length != 2) {
usage();
return 2;

View File

@ -280,7 +280,7 @@ public class TeraGen extends Configured implements Tool {
*/
public int run(String[] args)
throws IOException, InterruptedException, ClassNotFoundException {
Job job = Job.getInstance(new Cluster(getConf()), getConf());
Job job = Job.getInstance(getConf());
if (args.length != 2) {
usage();
return 2;

View File

@ -280,7 +280,7 @@ public class TeraSort extends Configured implements Tool {
public int run(String[] args) throws Exception {
LOG.info("starting");
Job job = Job.getInstance(new Cluster(getConf()), getConf());
Job job = Job.getInstance(getConf());
Path inputDir = new Path(args[0]);
Path outputDir = new Path(args[1]);
boolean useSimplePartitioner = getUseSimplePartitioner(job);

View File

@ -157,7 +157,7 @@ public class TeraValidate extends Configured implements Tool {
}
public int run(String[] args) throws Exception {
Job job = Job.getInstance(new Cluster(getConf()), getConf());
Job job = Job.getInstance(getConf());
if (args.length != 2) {
usage();
return 1;

View File

@ -208,7 +208,7 @@ public class TestClusterStatus extends TestCase {
Configuration conf = mr.createJobConf();
conf.setInt(JobContext.NUM_MAPS, 1);
Job job = Job.getInstance(cluster, conf);
Job job = Job.getInstance(conf);
job.setNumReduceTasks(1);
job.setSpeculativeExecution(false);
job.setJobSetupCleanupNeeded(false);

View File

@ -199,7 +199,7 @@ public class TestJobCounters {
public static Job createJob() throws IOException {
final Configuration conf = new Configuration();
final Job baseJob = Job.getInstance(new Cluster(conf), conf);
final Job baseJob = Job.getInstance(conf);
baseJob.setOutputKeyClass(Text.class);
baseJob.setOutputValueClass(IntWritable.class);
baseJob.setMapperClass(NewMapTokenizer.class);

View File

@ -298,7 +298,7 @@ public class TestMapCollection {
throws Exception {
Configuration conf = new Configuration();
conf.setInt(Job.COMPLETION_POLL_INTERVAL_KEY, 100);
Job job = Job.getInstance(new Cluster(conf), conf);
Job job = Job.getInstance(conf);
conf = job.getConfiguration();
conf.setInt(MRJobConfig.IO_SORT_MB, ioSortMB);
conf.set(MRJobConfig.MAP_SORT_SPILL_PERCENT, Float.toString(spillPer));
@ -409,7 +409,7 @@ public class TestMapCollection {
// no writes into the serialization buffer
Configuration conf = new Configuration();
conf.setInt(Job.COMPLETION_POLL_INTERVAL_KEY, 100);
Job job = Job.getInstance(new Cluster(conf), conf);
Job job = Job.getInstance(conf);
conf = job.getConfiguration();
conf.setInt(MRJobConfig.IO_SORT_MB, 1);
// 2^20 * spill = 14336 bytes available post-spill, at most 896 meta
@ -427,7 +427,7 @@ public class TestMapCollection {
public void testLargeRecConcurrent() throws Exception {
Configuration conf = new Configuration();
conf.setInt(Job.COMPLETION_POLL_INTERVAL_KEY, 100);
Job job = Job.getInstance(new Cluster(conf), conf);
Job job = Job.getInstance(conf);
conf = job.getConfiguration();
conf.setInt(MRJobConfig.IO_SORT_MB, 1);
conf.set(MRJobConfig.MAP_SORT_SPILL_PERCENT, Float.toString(.986328125f));
@ -496,7 +496,7 @@ public class TestMapCollection {
public void testRandom() throws Exception {
Configuration conf = new Configuration();
conf.setInt(Job.COMPLETION_POLL_INTERVAL_KEY, 100);
Job job = Job.getInstance(new Cluster(conf), conf);
Job job = Job.getInstance(conf);
conf = job.getConfiguration();
conf.setInt(MRJobConfig.IO_SORT_MB, 1);
conf.setClass("test.mapcollection.class", RandomFactory.class,
@ -517,7 +517,7 @@ public class TestMapCollection {
public void testRandomCompress() throws Exception {
Configuration conf = new Configuration();
conf.setInt(Job.COMPLETION_POLL_INTERVAL_KEY, 100);
Job job = Job.getInstance(new Cluster(conf), conf);
Job job = Job.getInstance(conf);
conf = job.getConfiguration();
conf.setInt(MRJobConfig.IO_SORT_MB, 1);
conf.setBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS, true);

View File

@ -234,12 +234,11 @@ public class TestTrackerDistributedCacheManager extends TestCase {
}
TrackerDistributedCacheManager manager =
new FakeTrackerDistributedCacheManager(conf);
Cluster cluster = new Cluster(conf);
String userName = getJobOwnerName();
File workDir = new File(new Path(TEST_ROOT_DIR, "workdir").toString());
// Configures a job with a regular file
Job job1 = Job.getInstance(cluster, conf);
Job job1 = Job.getInstance(conf);
job1.setUser(userName);
job1.addCacheFile(secondCacheFile.toUri());
Configuration conf1 = job1.getConfiguration();
@ -262,7 +261,7 @@ public class TestTrackerDistributedCacheManager extends TestCase {
createPrivateTempFile(thirdCacheFile);
// Configures another job with three regular files.
Job job2 = Job.getInstance(cluster, conf);
Job job2 = Job.getInstance(conf);
job2.setUser(userName);
// add a file that would get failed to localize
job2.addCacheFile(firstCacheFile.toUri());
@ -366,7 +365,6 @@ public class TestTrackerDistributedCacheManager extends TestCase {
throws IOException, LoginException, InterruptedException {
TrackerDistributedCacheManager manager =
new TrackerDistributedCacheManager(conf, taskController);
Cluster cluster = new Cluster(conf);
String userName = getJobOwnerName();
File workDir = new File(TEST_ROOT_DIR, "workdir");
Path cacheFile = new Path(TEST_ROOT_DIR, "fourthcachefile");
@ -376,7 +374,7 @@ public class TestTrackerDistributedCacheManager extends TestCase {
createPrivateTempFile(cacheFile);
}
Job job1 = Job.getInstance(cluster, conf);
Job job1 = Job.getInstance(conf);
job1.setUser(userName);
job1.addCacheFile(cacheFile.toUri());
Configuration conf1 = job1.getConfiguration();