MAPREDUCE-1788. o.a.h.mapreduce.Job shouldn't make a copy of the JobConf. (Arun Murthy via mahadev) - Merging r1172171 from trunk

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1172172 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Mahadev Konar 2011-09-18 02:52:44 +00:00
parent c09246b2e8
commit 8c99dc8435
18 changed files with 145 additions and 64 deletions

View File

@ -1320,6 +1320,9 @@ Release 0.22.0 - Unreleased
MAPREDUCE-2994. Fixed a bug in ApplicationID parsing that affects RM MAPREDUCE-2994. Fixed a bug in ApplicationID parsing that affects RM
UI. (Devaraj K via vinodkv) UI. (Devaraj K via vinodkv)
MAPREDUCE-1788. o.a.h.mapreduce.Job shouldn't make a copy of the JobConf.
(Arun Murthy via mahadev)
NEW FEATURES NEW FEATURES
MAPREDUCE-1804. Stress-test tool for HDFS introduced in HDFS-708. MAPREDUCE-1804. Stress-test tool for HDFS introduced in HDFS-708.

View File

@ -535,7 +535,7 @@ public RunningJob submitJob(JobConf conf) throws FileNotFoundException,
try { try {
conf.setBooleanIfUnset("mapred.mapper.new-api", false); conf.setBooleanIfUnset("mapred.mapper.new-api", false);
conf.setBooleanIfUnset("mapred.reducer.new-api", false); conf.setBooleanIfUnset("mapred.reducer.new-api", false);
Job job = Job.getInstance(cluster, conf); Job job = Job.getInstance(conf);
job.submit(); job.submit();
return new NetworkedJob(job); return new NetworkedJob(job);
} catch (InterruptedException ie) { } catch (InterruptedException ie) {

View File

@ -112,7 +112,7 @@ public synchronized void close() throws IOException {
private Job[] getJobs(JobStatus[] stats) throws IOException { private Job[] getJobs(JobStatus[] stats) throws IOException {
List<Job> jobs = new ArrayList<Job>(); List<Job> jobs = new ArrayList<Job>();
for (JobStatus stat : stats) { for (JobStatus stat : stats) {
jobs.add(new Job(this, stat, new JobConf(stat.getJobFile()))); jobs.add(Job.getInstance(this, stat, new JobConf(stat.getJobFile())));
} }
return jobs.toArray(new Job[0]); return jobs.toArray(new Job[0]);
} }
@ -152,7 +152,7 @@ public FileSystem run() throws IOException, InterruptedException {
public Job getJob(JobID jobId) throws IOException, InterruptedException { public Job getJob(JobID jobId) throws IOException, InterruptedException {
JobStatus status = client.getJobStatus(jobId); JobStatus status = client.getJobStatus(jobId);
if (status != null) { if (status != null) {
return new Job(this, status, new JobConf(status.getJobFile())); return Job.getInstance(this, status, new JobConf(status.getJobFile()));
} }
return null; return null;
} }

View File

@ -31,22 +31,22 @@
import java.net.URI; import java.net.URI;
import java.security.PrivilegedExceptionAction; import java.security.PrivilegedExceptionAction;
import javax.security.auth.login.LoginException;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configuration.IntegerRanges; import org.apache.hadoop.conf.Configuration.IntegerRanges;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.RawComparator; import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.filecache.DistributedCache; import org.apache.hadoop.mapreduce.filecache.DistributedCache;
import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
import org.apache.hadoop.mapreduce.task.JobContextImpl; import org.apache.hadoop.mapreduce.task.JobContextImpl;
import org.apache.hadoop.mapreduce.util.ConfigUtil; import org.apache.hadoop.mapreduce.util.ConfigUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
/** /**
@ -130,7 +130,7 @@ public Job() throws IOException {
@Deprecated @Deprecated
public Job(Configuration conf) throws IOException { public Job(Configuration conf) throws IOException {
this(new Cluster(conf), conf); this(new JobConf(conf));
} }
@Deprecated @Deprecated
@ -139,18 +139,13 @@ public Job(Configuration conf, String jobName) throws IOException {
setJobName(jobName); setJobName(jobName);
} }
Job(Cluster cluster) throws IOException { Job(JobConf conf) throws IOException {
this(cluster, new Configuration());
}
Job(Cluster cluster, Configuration conf) throws IOException {
super(conf, null); super(conf, null);
this.cluster = cluster; this.cluster = null;
} }
Job(Cluster cluster, JobStatus status, Job(JobStatus status, JobConf conf) throws IOException {
Configuration conf) throws IOException { this(conf);
this(cluster, conf);
setJobID(status.getJobID()); setJobID(status.getJobID());
this.status = status; this.status = status;
state = JobState.RUNNING; state = JobState.RUNNING;
@ -170,7 +165,13 @@ public static Job getInstance() throws IOException {
} }
/** /**
* Creates a new {@link Job} with no particular {@link Cluster} . * Creates a new {@link Job} with no particular {@link Cluster} and a
* given {@link Configuration}.
*
* The <code>Job</code> makes a copy of the <code>Configuration</code> so
* that any necessary internal modifications do not reflect on the incoming
* parameter.
*
* A Cluster will be created from the conf parameter only when it's needed. * A Cluster will be created from the conf parameter only when it's needed.
* *
* @param conf the configuration * @param conf the configuration
@ -179,13 +180,18 @@ public static Job getInstance() throws IOException {
*/ */
public static Job getInstance(Configuration conf) throws IOException { public static Job getInstance(Configuration conf) throws IOException {
// create with a null Cluster // create with a null Cluster
return new Job(null, conf); JobConf jobConf = new JobConf(conf);
return new Job(jobConf);
} }
/** /**
* Creates a new {@link Job} with no particular {@link Cluster} and a given jobName. * Creates a new {@link Job} with no particular {@link Cluster} and a given jobName.
* A Cluster will be created from the conf parameter only when it's needed. * A Cluster will be created from the conf parameter only when it's needed.
*
* The <code>Job</code> makes a copy of the <code>Configuration</code> so
* that any necessary internal modifications do not reflect on the incoming
* parameter.
* *
* @param conf the configuration * @param conf the configuration
* @return the {@link Job} , with no connection to a cluster yet. * @return the {@link Job} , with no connection to a cluster yet.
@ -194,25 +200,92 @@ public static Job getInstance(Configuration conf) throws IOException {
public static Job getInstance(Configuration conf, String jobName) public static Job getInstance(Configuration conf, String jobName)
throws IOException { throws IOException {
// create with a null Cluster // create with a null Cluster
Job result = new Job(null, conf); Job result = getInstance(conf);
result.setJobName(jobName); result.setJobName(jobName);
return result; return result;
} }
public static Job getInstance(Cluster cluster) throws IOException { /**
return new Job(cluster); * Creates a new {@link Job} with no particular {@link Cluster} and given
* {@link Configuration} and {@link JobStatus}.
* A Cluster will be created from the conf parameter only when it's needed.
*
* The <code>Job</code> makes a copy of the <code>Configuration</code> so
* that any necessary internal modifications do not reflect on the incoming
* parameter.
*
* @param status job status
* @param conf job configuration
* @return the {@link Job} , with no connection to a cluster yet.
* @throws IOException
*/
public static Job getInstance(JobStatus status, Configuration conf)
throws IOException {
return new Job(status, new JobConf(conf));
}
/**
* Creates a new {@link Job} with no particular {@link Cluster}.
* A Cluster will be created from the conf parameter only when it's needed.
*
* The <code>Job</code> makes a copy of the <code>Configuration</code> so
* that any necessary internal modifications do not reflect on the incoming
* parameter.
*
* @param ignored
* @return the {@link Job} , with no connection to a cluster yet.
* @throws IOException
* @deprecated Use {@link #getInstance()}
*/
@Deprecated
public static Job getInstance(Cluster ignored) throws IOException {
return getInstance();
} }
public static Job getInstance(Cluster cluster, Configuration conf) /**
* Creates a new {@link Job} with no particular {@link Cluster} and given
* {@link Configuration}.
* A Cluster will be created from the conf parameter only when it's needed.
*
* The <code>Job</code> makes a copy of the <code>Configuration</code> so
* that any necessary internal modifications do not reflect on the incoming
* parameter.
*
* @param ignored
* @param conf job configuration
* @return the {@link Job} , with no connection to a cluster yet.
* @throws IOException
* @deprecated Use {@link #getInstance(Configuration)}
*/
@Deprecated
public static Job getInstance(Cluster ignored, Configuration conf)
throws IOException { throws IOException {
return new Job(cluster, conf); return getInstance(conf);
} }
/**
* Creates a new {@link Job} with no particular {@link Cluster} and given
* {@link Configuration} and {@link JobStatus}.
* A Cluster will be created from the conf parameter only when it's needed.
*
* The <code>Job</code> makes a copy of the <code>Configuration</code> so
* that any necessary internal modifications do not reflect on the incoming
* parameter.
*
* @param cluster cluster
* @param status job status
* @param conf job configuration
* @return the {@link Job} , with no connection to a cluster yet.
* @throws IOException
*/
@Private
public static Job getInstance(Cluster cluster, JobStatus status, public static Job getInstance(Cluster cluster, JobStatus status,
Configuration conf) throws IOException { Configuration conf) throws IOException {
return new Job(cluster, status, conf); Job job = getInstance(status, conf);
job.setCluster(cluster);
return job;
} }
private void ensureState(JobState state) throws IllegalStateException { private void ensureState(JobState state) throws IllegalStateException {
if (state != this.state) { if (state != this.state) {
throw new IllegalStateException("Job in state "+ this.state + throw new IllegalStateException("Job in state "+ this.state +
@ -254,6 +327,10 @@ public JobStatus getStatus() throws IOException, InterruptedException {
updateStatus(); updateStatus();
return status; return status;
} }
private void setStatus(JobStatus status) {
this.status = status;
}
/** /**
* Returns the current state of the Job. * Returns the current state of the Job.
@ -354,6 +431,12 @@ public boolean isRetired() throws IOException, InterruptedException {
return status.isRetired(); return status.isRetired();
} }
/** Only for mocks in unit tests. */
@Private
private void setCluster(Cluster cluster) {
this.cluster = cluster;
}
/** /**
* Dump stats to screen. * Dump stats to screen.
*/ */
@ -1055,6 +1138,12 @@ boolean isConnected() {
return cluster != null; return cluster != null;
} }
/** Only for mocking via unit tests. */
@Private
public JobSubmitter getJobSubmitter(FileSystem fs,
ClientProtocol submitClient) throws IOException {
return new JobSubmitter(fs, submitClient);
}
/** /**
* Submit the job to the cluster and return immediately. * Submit the job to the cluster and return immediately.
* @throws IOException * @throws IOException
@ -1064,8 +1153,8 @@ public void submit()
ensureState(JobState.DEFINE); ensureState(JobState.DEFINE);
setUseNewAPI(); setUseNewAPI();
connect(); connect();
final JobSubmitter submitter = new JobSubmitter(cluster.getFileSystem(), final JobSubmitter submitter =
cluster.getClient()); getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() { status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
public JobStatus run() throws IOException, InterruptedException, public JobStatus run() throws IOException, InterruptedException,
ClassNotFoundException { ClassNotFoundException {
@ -1114,7 +1203,7 @@ public boolean monitorAndPrintJob()
throws IOException, InterruptedException { throws IOException, InterruptedException {
String lastReport = null; String lastReport = null;
Job.TaskStatusFilter filter; Job.TaskStatusFilter filter;
Configuration clientConf = cluster.getConf(); Configuration clientConf = getConfiguration();
filter = Job.getTaskOutputFilter(clientConf); filter = Job.getTaskOutputFilter(clientConf);
JobID jobId = getJobID(); JobID jobId = getJobID();
LOG.info("Running job: " + jobId); LOG.info("Running job: " + jobId);

View File

@ -319,7 +319,6 @@ private void copyAndConfigureFiles(Job job, Path jobSubmitDir)
* @throws InterruptedException * @throws InterruptedException
* @throws IOException * @throws IOException
*/ */
@SuppressWarnings("unchecked")
JobStatus submitJobInternal(Job job, Cluster cluster) JobStatus submitJobInternal(Job job, Cluster cluster)
throws ClassNotFoundException, InterruptedException, IOException { throws ClassNotFoundException, InterruptedException, IOException {

View File

@ -27,6 +27,7 @@
import org.apache.hadoop.conf.Configuration.IntegerRanges; import org.apache.hadoop.conf.Configuration.IntegerRanges;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.RawComparator; import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.JobContext;
@ -60,7 +61,11 @@ public class JobContextImpl implements JobContext {
protected final Credentials credentials; protected final Credentials credentials;
public JobContextImpl(Configuration conf, JobID jobId) { public JobContextImpl(Configuration conf, JobID jobId) {
this.conf = new org.apache.hadoop.mapred.JobConf(conf); if (conf instanceof JobConf) {
this.conf = (JobConf)conf;
} else {
this.conf = new JobConf(conf);
}
this.jobId = jobId; this.jobId = jobId;
this.credentials = this.conf.getCredentials(); this.credentials = this.conf.getCredentials();
try { try {

View File

@ -215,7 +215,7 @@ public int run(String[] argv) throws Exception {
// Submit the request // Submit the request
try { try {
if (submitJobFile != null) { if (submitJobFile != null) {
Job job = Job.getInstance(cluster, new JobConf(submitJobFile)); Job job = Job.getInstance(new JobConf(submitJobFile));
job.submit(); job.submit();
System.out.println("Created job " + job.getJobID()); System.out.println("Created job " + job.getJobID());
exitCode = 0; exitCode = 0;

View File

@ -64,7 +64,7 @@ public void setUp() throws IOException {
when(cluster.getClient()).thenReturn(clientProtocol); when(cluster.getClient()).thenReturn(clientProtocol);
JobStatus jobStatus = new JobStatus(new JobID("job_000", 1), 0f, 0f, 0f, 0f, JobStatus jobStatus = new JobStatus(new JobID("job_000", 1), 0f, 0f, 0f, 0f,
State.RUNNING, JobPriority.HIGH, "tmp-user", "tmp-jobname", "tmp-jobfile", "tmp-url"); State.RUNNING, JobPriority.HIGH, "tmp-user", "tmp-jobname", "tmp-jobfile", "tmp-url");
job = new Job(cluster, jobStatus, conf); job = Job.getInstance(cluster, jobStatus, conf);
job = spy(job); job = spy(job);
} }

View File

@ -223,23 +223,10 @@ public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts)
throw new YarnException(e); throw new YarnException(e);
} }
// XXX Remove
Path submitJobDir = new Path(jobSubmitDir);
FileContext defaultFS = FileContext.getFileContext(conf);
Path submitJobFile =
defaultFS.makeQualified(JobSubmissionFiles.getJobConfPath(submitJobDir));
FSDataInputStream in = defaultFS.open(submitJobFile);
conf.addResource(in);
// ---
// Construct necessary information to start the MR AM // Construct necessary information to start the MR AM
ApplicationSubmissionContext appContext = ApplicationSubmissionContext appContext =
createApplicationSubmissionContext(conf, jobSubmitDir, ts); createApplicationSubmissionContext(conf, jobSubmitDir, ts);
// XXX Remove
in.close();
// ---
// Submit to ResourceManager // Submit to ResourceManager
ApplicationId applicationId = resMgrDelegate.submitApplication(appContext); ApplicationId applicationId = resMgrDelegate.submitApplication(appContext);

View File

@ -139,8 +139,8 @@ public void testRedirect() throws Exception {
Cluster cluster = new Cluster(conf); Cluster cluster = new Cluster(conf);
org.apache.hadoop.mapreduce.JobID jobID = org.apache.hadoop.mapreduce.JobID jobID =
new org.apache.hadoop.mapred.JobID("201103121733", 1); new org.apache.hadoop.mapred.JobID("201103121733", 1);
org.apache.hadoop.mapreduce.Counters counters = cluster.getJob(jobID) org.apache.hadoop.mapreduce.Counters counters =
.getCounters(); cluster.getJob(jobID).getCounters();
validateCounters(counters); validateCounters(counters);
Assert.assertTrue(amContact); Assert.assertTrue(amContact);

View File

@ -74,7 +74,7 @@ private static void usage() throws IOException {
} }
public int run(String[] args) throws Exception { public int run(String[] args) throws Exception {
Job job = Job.getInstance(new Cluster(getConf()), getConf()); Job job = Job.getInstance(getConf());
if (args.length != 2) { if (args.length != 2) {
usage(); usage();
return 2; return 2;

View File

@ -280,7 +280,7 @@ private static long parseHumanLong(String str) {
*/ */
public int run(String[] args) public int run(String[] args)
throws IOException, InterruptedException, ClassNotFoundException { throws IOException, InterruptedException, ClassNotFoundException {
Job job = Job.getInstance(new Cluster(getConf()), getConf()); Job job = Job.getInstance(getConf());
if (args.length != 2) { if (args.length != 2) {
usage(); usage();
return 2; return 2;

View File

@ -280,7 +280,7 @@ public static void setOutputReplication(Job job, int value) {
public int run(String[] args) throws Exception { public int run(String[] args) throws Exception {
LOG.info("starting"); LOG.info("starting");
Job job = Job.getInstance(new Cluster(getConf()), getConf()); Job job = Job.getInstance(getConf());
Path inputDir = new Path(args[0]); Path inputDir = new Path(args[0]);
Path outputDir = new Path(args[1]); Path outputDir = new Path(args[1]);
boolean useSimplePartitioner = getUseSimplePartitioner(job); boolean useSimplePartitioner = getUseSimplePartitioner(job);

View File

@ -157,7 +157,7 @@ private static void usage() throws IOException {
} }
public int run(String[] args) throws Exception { public int run(String[] args) throws Exception {
Job job = Job.getInstance(new Cluster(getConf()), getConf()); Job job = Job.getInstance(getConf());
if (args.length != 2) { if (args.length != 2) {
usage(); usage();
return 1; return 1;

View File

@ -208,7 +208,7 @@ public void testReservedSlots() throws Exception {
Configuration conf = mr.createJobConf(); Configuration conf = mr.createJobConf();
conf.setInt(JobContext.NUM_MAPS, 1); conf.setInt(JobContext.NUM_MAPS, 1);
Job job = Job.getInstance(cluster, conf); Job job = Job.getInstance(conf);
job.setNumReduceTasks(1); job.setNumReduceTasks(1);
job.setSpeculativeExecution(false); job.setSpeculativeExecution(false);
job.setJobSetupCleanupNeeded(false); job.setJobSetupCleanupNeeded(false);

View File

@ -199,7 +199,7 @@ public static JobConf createConfiguration() throws IOException {
public static Job createJob() throws IOException { public static Job createJob() throws IOException {
final Configuration conf = new Configuration(); final Configuration conf = new Configuration();
final Job baseJob = Job.getInstance(new Cluster(conf), conf); final Job baseJob = Job.getInstance(conf);
baseJob.setOutputKeyClass(Text.class); baseJob.setOutputKeyClass(Text.class);
baseJob.setOutputValueClass(IntWritable.class); baseJob.setOutputValueClass(IntWritable.class);
baseJob.setMapperClass(NewMapTokenizer.class); baseJob.setMapperClass(NewMapTokenizer.class);

View File

@ -298,7 +298,7 @@ private static void runTest(String name, int keylen, int vallen,
throws Exception { throws Exception {
Configuration conf = new Configuration(); Configuration conf = new Configuration();
conf.setInt(Job.COMPLETION_POLL_INTERVAL_KEY, 100); conf.setInt(Job.COMPLETION_POLL_INTERVAL_KEY, 100);
Job job = Job.getInstance(new Cluster(conf), conf); Job job = Job.getInstance(conf);
conf = job.getConfiguration(); conf = job.getConfiguration();
conf.setInt(MRJobConfig.IO_SORT_MB, ioSortMB); conf.setInt(MRJobConfig.IO_SORT_MB, ioSortMB);
conf.set(MRJobConfig.MAP_SORT_SPILL_PERCENT, Float.toString(spillPer)); conf.set(MRJobConfig.MAP_SORT_SPILL_PERCENT, Float.toString(spillPer));
@ -409,7 +409,7 @@ public void testPostSpillMeta() throws Exception {
// no writes into the serialization buffer // no writes into the serialization buffer
Configuration conf = new Configuration(); Configuration conf = new Configuration();
conf.setInt(Job.COMPLETION_POLL_INTERVAL_KEY, 100); conf.setInt(Job.COMPLETION_POLL_INTERVAL_KEY, 100);
Job job = Job.getInstance(new Cluster(conf), conf); Job job = Job.getInstance(conf);
conf = job.getConfiguration(); conf = job.getConfiguration();
conf.setInt(MRJobConfig.IO_SORT_MB, 1); conf.setInt(MRJobConfig.IO_SORT_MB, 1);
// 2^20 * spill = 14336 bytes available post-spill, at most 896 meta // 2^20 * spill = 14336 bytes available post-spill, at most 896 meta
@ -427,7 +427,7 @@ public void testPostSpillMeta() throws Exception {
public void testLargeRecConcurrent() throws Exception { public void testLargeRecConcurrent() throws Exception {
Configuration conf = new Configuration(); Configuration conf = new Configuration();
conf.setInt(Job.COMPLETION_POLL_INTERVAL_KEY, 100); conf.setInt(Job.COMPLETION_POLL_INTERVAL_KEY, 100);
Job job = Job.getInstance(new Cluster(conf), conf); Job job = Job.getInstance(conf);
conf = job.getConfiguration(); conf = job.getConfiguration();
conf.setInt(MRJobConfig.IO_SORT_MB, 1); conf.setInt(MRJobConfig.IO_SORT_MB, 1);
conf.set(MRJobConfig.MAP_SORT_SPILL_PERCENT, Float.toString(.986328125f)); conf.set(MRJobConfig.MAP_SORT_SPILL_PERCENT, Float.toString(.986328125f));
@ -496,7 +496,7 @@ public int valLen(int i) {
public void testRandom() throws Exception { public void testRandom() throws Exception {
Configuration conf = new Configuration(); Configuration conf = new Configuration();
conf.setInt(Job.COMPLETION_POLL_INTERVAL_KEY, 100); conf.setInt(Job.COMPLETION_POLL_INTERVAL_KEY, 100);
Job job = Job.getInstance(new Cluster(conf), conf); Job job = Job.getInstance(conf);
conf = job.getConfiguration(); conf = job.getConfiguration();
conf.setInt(MRJobConfig.IO_SORT_MB, 1); conf.setInt(MRJobConfig.IO_SORT_MB, 1);
conf.setClass("test.mapcollection.class", RandomFactory.class, conf.setClass("test.mapcollection.class", RandomFactory.class,
@ -517,7 +517,7 @@ public void testRandom() throws Exception {
public void testRandomCompress() throws Exception { public void testRandomCompress() throws Exception {
Configuration conf = new Configuration(); Configuration conf = new Configuration();
conf.setInt(Job.COMPLETION_POLL_INTERVAL_KEY, 100); conf.setInt(Job.COMPLETION_POLL_INTERVAL_KEY, 100);
Job job = Job.getInstance(new Cluster(conf), conf); Job job = Job.getInstance(conf);
conf = job.getConfiguration(); conf = job.getConfiguration();
conf.setInt(MRJobConfig.IO_SORT_MB, 1); conf.setInt(MRJobConfig.IO_SORT_MB, 1);
conf.setBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS, true); conf.setBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS, true);

View File

@ -234,12 +234,11 @@ public void testReferenceCount() throws IOException, LoginException,
} }
TrackerDistributedCacheManager manager = TrackerDistributedCacheManager manager =
new FakeTrackerDistributedCacheManager(conf); new FakeTrackerDistributedCacheManager(conf);
Cluster cluster = new Cluster(conf);
String userName = getJobOwnerName(); String userName = getJobOwnerName();
File workDir = new File(new Path(TEST_ROOT_DIR, "workdir").toString()); File workDir = new File(new Path(TEST_ROOT_DIR, "workdir").toString());
// Configures a job with a regular file // Configures a job with a regular file
Job job1 = Job.getInstance(cluster, conf); Job job1 = Job.getInstance(conf);
job1.setUser(userName); job1.setUser(userName);
job1.addCacheFile(secondCacheFile.toUri()); job1.addCacheFile(secondCacheFile.toUri());
Configuration conf1 = job1.getConfiguration(); Configuration conf1 = job1.getConfiguration();
@ -262,7 +261,7 @@ public void testReferenceCount() throws IOException, LoginException,
createPrivateTempFile(thirdCacheFile); createPrivateTempFile(thirdCacheFile);
// Configures another job with three regular files. // Configures another job with three regular files.
Job job2 = Job.getInstance(cluster, conf); Job job2 = Job.getInstance(conf);
job2.setUser(userName); job2.setUser(userName);
// add a file that would get failed to localize // add a file that would get failed to localize
job2.addCacheFile(firstCacheFile.toUri()); job2.addCacheFile(firstCacheFile.toUri());
@ -366,7 +365,6 @@ private Path checkLocalizedPath(boolean visibility)
throws IOException, LoginException, InterruptedException { throws IOException, LoginException, InterruptedException {
TrackerDistributedCacheManager manager = TrackerDistributedCacheManager manager =
new TrackerDistributedCacheManager(conf, taskController); new TrackerDistributedCacheManager(conf, taskController);
Cluster cluster = new Cluster(conf);
String userName = getJobOwnerName(); String userName = getJobOwnerName();
File workDir = new File(TEST_ROOT_DIR, "workdir"); File workDir = new File(TEST_ROOT_DIR, "workdir");
Path cacheFile = new Path(TEST_ROOT_DIR, "fourthcachefile"); Path cacheFile = new Path(TEST_ROOT_DIR, "fourthcachefile");
@ -376,7 +374,7 @@ private Path checkLocalizedPath(boolean visibility)
createPrivateTempFile(cacheFile); createPrivateTempFile(cacheFile);
} }
Job job1 = Job.getInstance(cluster, conf); Job job1 = Job.getInstance(conf);
job1.setUser(userName); job1.setUser(userName);
job1.addCacheFile(cacheFile.toUri()); job1.addCacheFile(cacheFile.toUri());
Configuration conf1 = job1.getConfiguration(); Configuration conf1 = job1.getConfiguration();