Merge -c 1185880 from trunk to branch-0.23 to complete fix for MAPREDUCE-2762.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1185881 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
5d639c9048
commit
a990576916
|
@ -1618,6 +1618,9 @@ Release 0.23.0 - Unreleased
|
||||||
MAPREDUCE-3197. TestMRClientService failing on building clean checkout of
|
MAPREDUCE-3197. TestMRClientService failing on building clean checkout of
|
||||||
branch 0.23 (mahadev)
|
branch 0.23 (mahadev)
|
||||||
|
|
||||||
|
MAPREDUCE-2762. Cleanup MR staging directory on completion. (mahadev via
|
||||||
|
acmurthy)
|
||||||
|
|
||||||
Release 0.22.0 - Unreleased
|
Release 0.22.0 - Unreleased
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -31,12 +31,14 @@ import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileContext;
|
import org.apache.hadoop.fs.FileContext;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.mapred.JobConf;
|
import org.apache.hadoop.mapred.JobConf;
|
||||||
import org.apache.hadoop.mapred.LocalContainerLauncher;
|
import org.apache.hadoop.mapred.LocalContainerLauncher;
|
||||||
import org.apache.hadoop.mapred.TaskAttemptListenerImpl;
|
import org.apache.hadoop.mapred.TaskAttemptListenerImpl;
|
||||||
import org.apache.hadoop.mapred.TaskUmbilicalProtocol;
|
import org.apache.hadoop.mapred.TaskUmbilicalProtocol;
|
||||||
import org.apache.hadoop.mapreduce.MRJobConfig;
|
import org.apache.hadoop.mapreduce.MRJobConfig;
|
||||||
|
import org.apache.hadoop.mapreduce.jobhistory.JobFinishedEvent;
|
||||||
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
|
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
|
||||||
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler;
|
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler;
|
||||||
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
|
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
|
||||||
|
@ -228,6 +230,97 @@ public class MRAppMaster extends CompositeService {
|
||||||
super.init(conf);
|
super.init(conf);
|
||||||
} // end of init()
|
} // end of init()
|
||||||
|
|
||||||
|
|
||||||
|
protected boolean keepJobFiles(JobConf conf) {
|
||||||
|
return (conf.getKeepTaskFilesPattern() != null || conf
|
||||||
|
.getKeepFailedTaskFiles());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create the default file System for this job.
|
||||||
|
* @param conf the conf object
|
||||||
|
* @return the default filesystem for this job
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
protected FileSystem getFileSystem(Configuration conf) throws IOException {
|
||||||
|
return FileSystem.get(conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* clean up staging directories for the job.
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public void cleanupStagingDir() throws IOException {
|
||||||
|
/* make sure we clean the staging files */
|
||||||
|
String jobTempDir = null;
|
||||||
|
FileSystem fs = getFileSystem(getConfig());
|
||||||
|
try {
|
||||||
|
if (!keepJobFiles(new JobConf(getConfig()))) {
|
||||||
|
jobTempDir = getConfig().get(MRJobConfig.MAPREDUCE_JOB_DIR);
|
||||||
|
if (jobTempDir == null) {
|
||||||
|
LOG.warn("Job Staging directory is null");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
Path jobTempDirPath = new Path(jobTempDir);
|
||||||
|
LOG.info("Deleting staging directory " + fs.getDefaultUri(getConfig()) +
|
||||||
|
" " + jobTempDir);
|
||||||
|
fs.delete(jobTempDirPath, true);
|
||||||
|
}
|
||||||
|
} catch(IOException io) {
|
||||||
|
LOG.error("Failed to cleanup staging dir " + jobTempDir, io);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Exit call. Just in a function call to enable testing.
|
||||||
|
*/
|
||||||
|
protected void sysexit() {
|
||||||
|
System.exit(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
private class JobFinishEventHandler implements EventHandler<JobFinishEvent> {
|
||||||
|
@Override
|
||||||
|
public void handle(JobFinishEvent event) {
|
||||||
|
// job has finished
|
||||||
|
// this is the only job, so shut down the Appmaster
|
||||||
|
// note in a workflow scenario, this may lead to creation of a new
|
||||||
|
// job (FIXME?)
|
||||||
|
|
||||||
|
// TODO:currently just wait for some time so clients can know the
|
||||||
|
// final states. Will be removed once RM come on.
|
||||||
|
try {
|
||||||
|
Thread.sleep(5000);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
LOG.info("Calling stop for all the services");
|
||||||
|
try {
|
||||||
|
stop();
|
||||||
|
} catch (Throwable t) {
|
||||||
|
LOG.warn("Graceful stop failed ", t);
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
cleanupStagingDir();
|
||||||
|
} catch(IOException io) {
|
||||||
|
LOG.warn("Failed to delete staging dir");
|
||||||
|
}
|
||||||
|
//TODO: this is required because rpc server does not shut down
|
||||||
|
// in spite of calling server.stop().
|
||||||
|
//Bring the process down by force.
|
||||||
|
//Not needed after HADOOP-7140
|
||||||
|
LOG.info("Exiting MR AppMaster..GoodBye!");
|
||||||
|
sysexit();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* create an event handler that handles the job finish event.
|
||||||
|
* @return the job finish event handler.
|
||||||
|
*/
|
||||||
|
protected EventHandler<JobFinishEvent> createJobFinishEventHandler() {
|
||||||
|
return new JobFinishEventHandler();
|
||||||
|
}
|
||||||
|
|
||||||
/** Create and initialize (but don't start) a single job. */
|
/** Create and initialize (but don't start) a single job. */
|
||||||
protected Job createJob(Configuration conf) {
|
protected Job createJob(Configuration conf) {
|
||||||
|
|
||||||
|
@ -238,36 +331,7 @@ public class MRAppMaster extends CompositeService {
|
||||||
((RunningAppContext) context).jobs.put(newJob.getID(), newJob);
|
((RunningAppContext) context).jobs.put(newJob.getID(), newJob);
|
||||||
|
|
||||||
dispatcher.register(JobFinishEvent.Type.class,
|
dispatcher.register(JobFinishEvent.Type.class,
|
||||||
new EventHandler<JobFinishEvent>() {
|
createJobFinishEventHandler());
|
||||||
@Override
|
|
||||||
public void handle(JobFinishEvent event) {
|
|
||||||
// job has finished
|
|
||||||
// this is the only job, so shut down the Appmaster
|
|
||||||
// note in a workflow scenario, this may lead to creation of a new
|
|
||||||
// job (FIXME?)
|
|
||||||
|
|
||||||
// TODO:currently just wait for some time so clients can know the
|
|
||||||
// final states. Will be removed once RM come on.
|
|
||||||
try {
|
|
||||||
Thread.sleep(5000);
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
e.printStackTrace();
|
|
||||||
}
|
|
||||||
LOG.info("Calling stop for all the services");
|
|
||||||
try {
|
|
||||||
stop();
|
|
||||||
} catch (Throwable t) {
|
|
||||||
LOG.warn("Graceful stop failed ", t);
|
|
||||||
}
|
|
||||||
//TODO: this is required because rpc server does not shut down
|
|
||||||
// in spite of calling server.stop().
|
|
||||||
//Bring the process down by force.
|
|
||||||
//Not needed after HADOOP-7140
|
|
||||||
LOG.info("Exiting MR AppMaster..GoodBye!");
|
|
||||||
System.exit(0);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
return newJob;
|
return newJob;
|
||||||
} // end createJob()
|
} // end createJob()
|
||||||
|
|
||||||
|
@ -553,6 +617,7 @@ public class MRAppMaster extends CompositeService {
|
||||||
|
|
||||||
///////////////////// Create the job itself.
|
///////////////////// Create the job itself.
|
||||||
job = createJob(getConfig());
|
job = createJob(getConfig());
|
||||||
|
|
||||||
// End of creating the job.
|
// End of creating the job.
|
||||||
|
|
||||||
// metrics system init is really init & start.
|
// metrics system init is really init & start.
|
||||||
|
|
|
@ -339,7 +339,6 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
|
||||||
JobEventType.JOB_DIAGNOSTIC_UPDATE,
|
JobEventType.JOB_DIAGNOSTIC_UPDATE,
|
||||||
JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE,
|
JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE,
|
||||||
JobEventType.INTERNAL_ERROR))
|
JobEventType.INTERNAL_ERROR))
|
||||||
|
|
||||||
// create the topology tables
|
// create the topology tables
|
||||||
.installTopology();
|
.installTopology();
|
||||||
|
|
||||||
|
@ -724,6 +723,16 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
|
||||||
this.getEventHandler().handle(new JobHistoryEvent(this.jobId, jfe));
|
this.getEventHandler().handle(new JobHistoryEvent(this.jobId, jfe));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create the default file System for this job.
|
||||||
|
* @param conf the conf object
|
||||||
|
* @return the default filesystem for this job
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
protected FileSystem getFileSystem(Configuration conf) throws IOException {
|
||||||
|
return FileSystem.get(conf);
|
||||||
|
}
|
||||||
|
|
||||||
static JobState checkJobCompleteSuccess(JobImpl job) {
|
static JobState checkJobCompleteSuccess(JobImpl job) {
|
||||||
// check for Job success
|
// check for Job success
|
||||||
if (job.completedTaskCount == job.getTasks().size()) {
|
if (job.completedTaskCount == job.getTasks().size()) {
|
||||||
|
@ -733,7 +742,6 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
LOG.warn("Could not do commit for Job", e);
|
LOG.warn("Could not do commit for Job", e);
|
||||||
}
|
}
|
||||||
|
|
||||||
job.logJobHistoryFinishedEvent();
|
job.logJobHistoryFinishedEvent();
|
||||||
return job.finished(JobState.SUCCEEDED);
|
return job.finished(JobState.SUCCEEDED);
|
||||||
}
|
}
|
||||||
|
@ -816,7 +824,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
|
||||||
job.metrics.preparingJob(job);
|
job.metrics.preparingJob(job);
|
||||||
try {
|
try {
|
||||||
setup(job);
|
setup(job);
|
||||||
job.fs = FileSystem.get(job.conf);
|
job.fs = job.getFileSystem(job.conf);
|
||||||
|
|
||||||
//log to job history
|
//log to job history
|
||||||
JobSubmittedEvent jse = new JobSubmittedEvent(job.oldJobId,
|
JobSubmittedEvent jse = new JobSubmittedEvent(job.oldJobId,
|
||||||
|
@ -848,13 +856,14 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
|
||||||
LOG.info("Using mapred newApiCommitter.");
|
LOG.info("Using mapred newApiCommitter.");
|
||||||
}
|
}
|
||||||
|
|
||||||
LOG.info("OutputCommitter set in config " + job.conf.get("mapred.output.committer.class"));
|
LOG.info("OutputCommitter set in config " + job.conf.get(
|
||||||
|
"mapred.output.committer.class"));
|
||||||
|
|
||||||
if (newApiCommitter) {
|
if (newApiCommitter) {
|
||||||
job.jobContext = new JobContextImpl(job.conf,
|
job.jobContext = new JobContextImpl(job.conf,
|
||||||
job.oldJobId);
|
job.oldJobId);
|
||||||
org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID = RecordFactoryProvider
|
org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID
|
||||||
.getRecordFactory(null)
|
= RecordFactoryProvider.getRecordFactory(null)
|
||||||
.newRecordInstance(
|
.newRecordInstance(
|
||||||
org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId.class);
|
org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId.class);
|
||||||
attemptID.setTaskId(RecordFactoryProvider.getRecordFactory(null)
|
attemptID.setTaskId(RecordFactoryProvider.getRecordFactory(null)
|
||||||
|
@ -884,14 +893,17 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
|
||||||
inputLength += taskSplitMetaInfo[i].getInputDataLength();
|
inputLength += taskSplitMetaInfo[i].getInputDataLength();
|
||||||
}
|
}
|
||||||
|
|
||||||
//FIXME: need new memory criterion for uber-decision (oops, too late here; until AM-resizing supported, must depend on job client to pass fat-slot needs)
|
//FIXME: need new memory criterion for uber-decision (oops, too late here;
|
||||||
|
// until AM-resizing supported, must depend on job client to pass fat-slot needs)
|
||||||
// these are no longer "system" settings, necessarily; user may override
|
// these are no longer "system" settings, necessarily; user may override
|
||||||
int sysMaxMaps = job.conf.getInt(MRJobConfig.JOB_UBERTASK_MAXMAPS, 9);
|
int sysMaxMaps = job.conf.getInt(MRJobConfig.JOB_UBERTASK_MAXMAPS, 9);
|
||||||
int sysMaxReduces =
|
int sysMaxReduces =
|
||||||
job.conf.getInt(MRJobConfig.JOB_UBERTASK_MAXREDUCES, 1);
|
job.conf.getInt(MRJobConfig.JOB_UBERTASK_MAXREDUCES, 1);
|
||||||
long sysMaxBytes = job.conf.getLong(MRJobConfig.JOB_UBERTASK_MAXBYTES,
|
long sysMaxBytes = job.conf.getLong(MRJobConfig.JOB_UBERTASK_MAXBYTES,
|
||||||
job.conf.getLong("dfs.block.size", 64*1024*1024)); //FIXME: this is wrong; get FS from [File?]InputFormat and default block size from that
|
job.conf.getLong("dfs.block.size", 64*1024*1024)); //FIXME: this is
|
||||||
//long sysMemSizeForUberSlot = JobTracker.getMemSizeForReduceSlot(); // FIXME [could use default AM-container memory size...]
|
// wrong; get FS from [File?]InputFormat and default block size from that
|
||||||
|
//long sysMemSizeForUberSlot = JobTracker.getMemSizeForReduceSlot();
|
||||||
|
// FIXME [could use default AM-container memory size...]
|
||||||
|
|
||||||
boolean uberEnabled =
|
boolean uberEnabled =
|
||||||
job.conf.getBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
|
job.conf.getBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
|
||||||
|
@ -900,8 +912,8 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
|
||||||
boolean smallInput = (inputLength <= sysMaxBytes);
|
boolean smallInput = (inputLength <= sysMaxBytes);
|
||||||
boolean smallMemory = true; //FIXME (see above)
|
boolean smallMemory = true; //FIXME (see above)
|
||||||
// ignoring overhead due to UberTask and statics as negligible here:
|
// ignoring overhead due to UberTask and statics as negligible here:
|
||||||
// FIXME && (Math.max(memoryPerMap, memoryPerReduce) <= sysMemSizeForUberSlot
|
// FIXME && (Math.max(memoryPerMap, memoryPerReduce) <= sysMemSizeForUberSlot
|
||||||
// || sysMemSizeForUberSlot == JobConf.DISABLED_MEMORY_LIMIT)
|
// || sysMemSizeForUberSlot == JobConf.DISABLED_MEMORY_LIMIT)
|
||||||
boolean notChainJob = !isChainJob(job.conf);
|
boolean notChainJob = !isChainJob(job.conf);
|
||||||
|
|
||||||
// User has overall veto power over uberization, or user can modify
|
// User has overall veto power over uberization, or user can modify
|
||||||
|
@ -935,7 +947,9 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
|
||||||
job.conf.setInt(MRJobConfig.REDUCE_MAX_ATTEMPTS, 1);
|
job.conf.setInt(MRJobConfig.REDUCE_MAX_ATTEMPTS, 1);
|
||||||
|
|
||||||
// disable speculation: makes no sense to speculate an entire job
|
// disable speculation: makes no sense to speculate an entire job
|
||||||
// canSpeculateMaps = canSpeculateReduces = false; // [TODO: in old version, ultimately was from conf.getMapSpeculativeExecution(), conf.getReduceSpeculativeExecution()]
|
//canSpeculateMaps = canSpeculateReduces = false; // [TODO: in old
|
||||||
|
//version, ultimately was from conf.getMapSpeculativeExecution(),
|
||||||
|
//conf.getReduceSpeculativeExecution()]
|
||||||
} else {
|
} else {
|
||||||
StringBuilder msg = new StringBuilder();
|
StringBuilder msg = new StringBuilder();
|
||||||
msg.append("Not uberizing ").append(job.jobId).append(" because:");
|
msg.append("Not uberizing ").append(job.jobId).append(" because:");
|
||||||
|
|
|
@ -0,0 +1,102 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.mapreduce.v2.app;
|
||||||
|
|
||||||
|
import static org.mockito.Matchers.any;
|
||||||
|
import static org.mockito.Matchers.anyBoolean;
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
import static org.mockito.Mockito.verify;
|
||||||
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import junit.framework.TestCase;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.mapreduce.MRJobConfig;
|
||||||
|
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
|
||||||
|
import org.apache.hadoop.mapreduce.v2.app.job.event.JobFinishEvent;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
|
import org.apache.hadoop.yarn.event.EventHandler;
|
||||||
|
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||||
|
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Make sure that the job staging directory clean up happens.
|
||||||
|
*/
|
||||||
|
public class TestStagingCleanup extends TestCase {
|
||||||
|
|
||||||
|
private Configuration conf = new Configuration();
|
||||||
|
private FileSystem fs;
|
||||||
|
private String stagingJobDir = "tmpJobDir";
|
||||||
|
private Path stagingJobPath = new Path(stagingJobDir);
|
||||||
|
private final static RecordFactory recordFactory = RecordFactoryProvider.
|
||||||
|
getRecordFactory(null);
|
||||||
|
private static final Log LOG = LogFactory.getLog(TestStagingCleanup.class);
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDeletionofStaging() throws IOException {
|
||||||
|
conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, stagingJobDir);
|
||||||
|
fs = mock(FileSystem.class);
|
||||||
|
when(fs.delete(any(Path.class), anyBoolean())).thenReturn(true);
|
||||||
|
ApplicationAttemptId attemptId = recordFactory.newRecordInstance(
|
||||||
|
ApplicationAttemptId.class);
|
||||||
|
attemptId.setAttemptId(0);
|
||||||
|
ApplicationId appId = recordFactory.newRecordInstance(ApplicationId.class);
|
||||||
|
appId.setClusterTimestamp(System.currentTimeMillis());
|
||||||
|
appId.setId(0);
|
||||||
|
attemptId.setApplicationId(appId);
|
||||||
|
JobId jobid = recordFactory.newRecordInstance(JobId.class);
|
||||||
|
jobid.setAppId(appId);
|
||||||
|
MRAppMaster appMaster = new TestMRApp(attemptId);
|
||||||
|
EventHandler<JobFinishEvent> handler =
|
||||||
|
appMaster.createJobFinishEventHandler();
|
||||||
|
handler.handle(new JobFinishEvent(jobid));
|
||||||
|
verify(fs).delete(stagingJobPath, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
private class TestMRApp extends MRAppMaster {
|
||||||
|
|
||||||
|
public TestMRApp(ApplicationAttemptId applicationAttemptId) {
|
||||||
|
super(applicationAttemptId);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected FileSystem getFileSystem(Configuration conf) {
|
||||||
|
return fs;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void sysexit() {
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Configuration getConfig() {
|
||||||
|
return conf;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -1119,7 +1119,7 @@ abstract public class Task implements Writable, Configurable {
|
||||||
// delete the staging area for the job
|
// delete the staging area for the job
|
||||||
JobConf conf = new JobConf(jobContext.getConfiguration());
|
JobConf conf = new JobConf(jobContext.getConfiguration());
|
||||||
if (!keepTaskFiles(conf)) {
|
if (!keepTaskFiles(conf)) {
|
||||||
String jobTempDir = conf.get("mapreduce.job.dir");
|
String jobTempDir = conf.get(MRJobConfig.MAPREDUCE_JOB_DIR);
|
||||||
Path jobTempDirPath = new Path(jobTempDir);
|
Path jobTempDirPath = new Path(jobTempDir);
|
||||||
FileSystem fs = jobTempDirPath.getFileSystem(conf);
|
FileSystem fs = jobTempDirPath.getFileSystem(conf);
|
||||||
fs.delete(jobTempDirPath, true);
|
fs.delete(jobTempDirPath, true);
|
||||||
|
|
|
@ -341,7 +341,7 @@ class JobSubmitter {
|
||||||
Path submitJobDir = new Path(jobStagingArea, jobId.toString());
|
Path submitJobDir = new Path(jobStagingArea, jobId.toString());
|
||||||
JobStatus status = null;
|
JobStatus status = null;
|
||||||
try {
|
try {
|
||||||
conf.set("mapreduce.job.dir", submitJobDir.toString());
|
conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, submitJobDir.toString());
|
||||||
LOG.debug("Configuring job " + jobId + " with " + submitJobDir
|
LOG.debug("Configuring job " + jobId + " with " + submitJobDir
|
||||||
+ " as the submit dir");
|
+ " as the submit dir");
|
||||||
// get delegation token for the dir
|
// get delegation token for the dir
|
||||||
|
|
|
@ -238,6 +238,8 @@ public interface MRJobConfig {
|
||||||
public static final String REDUCE_JAVA_OPTS = "mapreduce.reduce.java.opts";
|
public static final String REDUCE_JAVA_OPTS = "mapreduce.reduce.java.opts";
|
||||||
|
|
||||||
public static final String REDUCE_ULIMIT = "mapreduce.reduce.ulimit";
|
public static final String REDUCE_ULIMIT = "mapreduce.reduce.ulimit";
|
||||||
|
|
||||||
|
public static final String MAPREDUCE_JOB_DIR = "mapreduce.job.dir";
|
||||||
|
|
||||||
public static final String REDUCE_MAX_ATTEMPTS = "mapreduce.reduce.maxattempts";
|
public static final String REDUCE_MAX_ATTEMPTS = "mapreduce.reduce.maxattempts";
|
||||||
|
|
||||||
|
|
|
@ -31,7 +31,6 @@ import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
||||||
import org.apache.hadoop.yarn.util.Records;
|
|
||||||
import org.apache.log4j.Level;
|
import org.apache.log4j.Level;
|
||||||
import org.apache.log4j.LogManager;
|
import org.apache.log4j.LogManager;
|
||||||
import org.apache.log4j.Logger;
|
import org.apache.log4j.Logger;
|
||||||
|
|
Loading…
Reference in New Issue