svn merge -c 1429114 FIXES: MAPREDUCE-4819. AM can rerun job after reporting final job status to the client (bobby and Bikas Saha via bobby)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1429115 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Joseph Evans 2013-01-04 20:38:36 +00:00
parent 2cb090a075
commit 199df2b770
24 changed files with 1061 additions and 205 deletions

View File

@ -504,6 +504,9 @@ Release 0.23.6 - UNRELEASED
MAPREDUCE-4894. Renewal / cancellation of JobHistory tokens (Siddharth MAPREDUCE-4894. Renewal / cancellation of JobHistory tokens (Siddharth
Seth via tgraves) Seth via tgraves)
MAPREDUCE-4819. AM can rerun job after reporting final job status to the
client (bobby and Bikas Saha via bobby)
Release 0.23.5 - UNRELEASED Release 0.23.5 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -0,0 +1,125 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.jobhistory;
import java.io.FileNotFoundException;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.service.CompositeService;
/**
* Reads in history events from the JobHistoryFile and sends them out again
* to be recorded.
*/
public class JobHistoryCopyService extends CompositeService implements HistoryEventHandler {
private static final Log LOG = LogFactory.getLog(JobHistoryCopyService.class);
private final ApplicationAttemptId applicationAttemptId;
private final EventHandler handler;
private final JobId jobId;
public JobHistoryCopyService(ApplicationAttemptId applicationAttemptId,
EventHandler handler) {
super("JobHistoryCopyService");
this.applicationAttemptId = applicationAttemptId;
this.jobId = TypeConverter.toYarn(
TypeConverter.fromYarn(applicationAttemptId.getApplicationId()));
this.handler = handler;
}
@Override
public void init(Configuration conf) {
super.init(conf);
}
@Override
public void handleEvent(HistoryEvent event) throws IOException {
//Skip over the AM Events this is handled elsewhere
if (!(event instanceof AMStartedEvent)) {
handler.handle(new JobHistoryEvent(jobId, event));
}
}
@Override
public void start() {
try {
//TODO should we parse on a background thread???
parse();
} catch (IOException e) {
throw new YarnException(e);
}
super.start();
}
private void parse() throws IOException {
FSDataInputStream in = null;
try {
in = getPreviousJobHistoryFileStream(getConfig(), applicationAttemptId);
} catch (IOException e) {
LOG.warn("error trying to open previous history file. No history data " +
"will be copied over.", e);
return;
}
JobHistoryParser parser = new JobHistoryParser(in);
parser.parse(this);
Exception parseException = parser.getParseException();
if (parseException != null) {
LOG.info("Got an error parsing job-history file" +
", ignoring incomplete events.", parseException);
}
}
public static FSDataInputStream getPreviousJobHistoryFileStream(
Configuration conf, ApplicationAttemptId applicationAttemptId)
throws IOException {
FSDataInputStream in = null;
Path historyFile = null;
String jobId =
TypeConverter.fromYarn(applicationAttemptId.getApplicationId())
.toString();
String jobhistoryDir =
JobHistoryUtils.getConfiguredHistoryStagingDirPrefix(conf, jobId);
Path histDirPath =
FileContext.getFileContext(conf).makeQualified(new Path(jobhistoryDir));
FileContext fc = FileContext.getFileContext(histDirPath.toUri(), conf);
// read the previous history file
historyFile =
fc.makeQualified(JobHistoryUtils.getStagingJobHistoryFile(histDirPath,
jobId, (applicationAttemptId.getAttemptId() - 1)));
LOG.info("History file is at " + historyFile);
in = fc.open(historyFile);
return in;
}
}

View File

@ -116,12 +116,15 @@ public class JobHistoryEventHandler extends AbstractService
*/ */
@Override @Override
public void init(Configuration conf) { public void init(Configuration conf) {
String jobId =
TypeConverter.fromYarn(context.getApplicationID()).toString();
String stagingDirStr = null; String stagingDirStr = null;
String doneDirStr = null; String doneDirStr = null;
String userDoneDirStr = null; String userDoneDirStr = null;
try { try {
stagingDirStr = JobHistoryUtils.getConfiguredHistoryStagingDirPrefix(conf); stagingDirStr = JobHistoryUtils.getConfiguredHistoryStagingDirPrefix(conf,
jobId);
doneDirStr = doneDirStr =
JobHistoryUtils.getConfiguredHistoryIntermediateDoneDirPrefix(conf); JobHistoryUtils.getConfiguredHistoryIntermediateDoneDirPrefix(conf);
userDoneDirStr = userDoneDirStr =
@ -881,7 +884,7 @@ public class JobHistoryEventHandler extends AbstractService
private void moveToDoneNow(Path fromPath, Path toPath) throws IOException { private void moveToDoneNow(Path fromPath, Path toPath) throws IOException {
// check if path exists, in case of retries it may not exist // check if path exists, in case of retries it may not exist
if (stagingDirFS.exists(fromPath)) { if (stagingDirFS.exists(fromPath)) {
LOG.info("Moving " + fromPath.toString() + " to " + toPath.toString()); LOG.info("Copying " + fromPath.toString() + " to " + toPath.toString());
// TODO temporarily removing the existing dst // TODO temporarily removing the existing dst
if (doneDirFS.exists(toPath)) { if (doneDirFS.exists(toPath)) {
doneDirFS.delete(toPath, true); doneDirFS.delete(toPath, true);
@ -895,8 +898,6 @@ public class JobHistoryEventHandler extends AbstractService
LOG.info("copy failed"); LOG.info("copy failed");
doneDirFS.setPermission(toPath, new FsPermission( doneDirFS.setPermission(toPath, new FsPermission(
JobHistoryUtils.HISTORY_INTERMEDIATE_FILE_PERMISSIONS)); JobHistoryUtils.HISTORY_INTERMEDIATE_FILE_PERMISSIONS));
stagingDirFS.delete(fromPath, false);
} }
} }

View File

@ -51,6 +51,7 @@ import org.apache.hadoop.mapreduce.jobhistory.AMStartedEvent;
import org.apache.hadoop.mapreduce.jobhistory.EventReader; import org.apache.hadoop.mapreduce.jobhistory.EventReader;
import org.apache.hadoop.mapreduce.jobhistory.EventType; import org.apache.hadoop.mapreduce.jobhistory.EventType;
import org.apache.hadoop.mapreduce.jobhistory.HistoryEvent; import org.apache.hadoop.mapreduce.jobhistory.HistoryEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryCopyService;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
@ -66,6 +67,7 @@ import org.apache.hadoop.mapreduce.v2.app.commit.CommitterEvent;
import org.apache.hadoop.mapreduce.v2.app.commit.CommitterEventHandler; import org.apache.hadoop.mapreduce.v2.app.commit.CommitterEventHandler;
import org.apache.hadoop.mapreduce.v2.app.commit.CommitterEventType; import org.apache.hadoop.mapreduce.v2.app.commit.CommitterEventType;
import org.apache.hadoop.mapreduce.v2.app.job.Job; import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
import org.apache.hadoop.mapreduce.v2.app.job.Task; import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt; import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
@ -91,6 +93,7 @@ import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
import org.apache.hadoop.mapreduce.v2.app.speculate.DefaultSpeculator; import org.apache.hadoop.mapreduce.v2.app.speculate.DefaultSpeculator;
import org.apache.hadoop.mapreduce.v2.app.speculate.Speculator; import org.apache.hadoop.mapreduce.v2.app.speculate.Speculator;
import org.apache.hadoop.mapreduce.v2.app.speculate.SpeculatorEvent; import org.apache.hadoop.mapreduce.v2.app.speculate.SpeculatorEvent;
import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils; import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.Credentials;
@ -110,6 +113,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.service.AbstractService; import org.apache.hadoop.yarn.service.AbstractService;
import org.apache.hadoop.yarn.service.CompositeService; import org.apache.hadoop.yarn.service.CompositeService;
@ -179,9 +183,13 @@ public class MRAppMaster extends CompositeService {
private Job job; private Job job;
private Credentials fsTokens = new Credentials(); // Filled during init private Credentials fsTokens = new Credentials(); // Filled during init
private UserGroupInformation currentUser; // Will be setup during init protected UserGroupInformation currentUser; // Will be setup during init
private volatile boolean isLastAMRetry = false; private volatile boolean isLastAMRetry = false;
//Something happened and we should shut down right after we start up.
boolean errorHappenedShutDown = false;
private String shutDownMessage = null;
JobStateInternal forcedState = null;
public MRAppMaster(ApplicationAttemptId applicationAttemptId, public MRAppMaster(ApplicationAttemptId applicationAttemptId,
ContainerId containerId, String nmHost, int nmPort, int nmHttpPort, ContainerId containerId, String nmHost, int nmPort, int nmHttpPort,
@ -243,6 +251,86 @@ public class MRAppMaster extends CompositeService {
LOG.info("Using mapred newApiCommitter."); LOG.info("Using mapred newApiCommitter.");
} }
boolean copyHistory = false;
try {
String user = UserGroupInformation.getCurrentUser().getShortUserName();
Path stagingDir = MRApps.getStagingAreaDir(conf, user);
FileSystem fs = getFileSystem(conf);
boolean stagingExists = fs.exists(stagingDir);
Path startCommitFile = MRApps.getStartJobCommitFile(conf, user, jobId);
boolean commitStarted = fs.exists(startCommitFile);
Path endCommitSuccessFile = MRApps.getEndJobCommitSuccessFile(conf, user, jobId);
boolean commitSuccess = fs.exists(endCommitSuccessFile);
Path endCommitFailureFile = MRApps.getEndJobCommitFailureFile(conf, user, jobId);
boolean commitFailure = fs.exists(endCommitFailureFile);
if(!stagingExists) {
isLastAMRetry = true;
errorHappenedShutDown = true;
forcedState = JobStateInternal.ERROR;
shutDownMessage = "Staging dir does not exist " + stagingDir;
LOG.fatal(shutDownMessage);
} else if (commitStarted) {
//A commit was started so this is the last time, we just need to know
// what result we will use to notify, and how we will unregister
errorHappenedShutDown = true;
isLastAMRetry = true;
copyHistory = true;
if (commitSuccess) {
shutDownMessage = "We crashed after successfully committing. Recovering.";
forcedState = JobStateInternal.SUCCEEDED;
} else if (commitFailure) {
shutDownMessage = "We crashed after a commit failure.";
forcedState = JobStateInternal.FAILED;
} else {
//The commit is still pending, commit error
shutDownMessage = "We crashed durring a commit";
forcedState = JobStateInternal.ERROR;
}
}
} catch (IOException e) {
throw new YarnException("Error while initializing", e);
}
if (errorHappenedShutDown) {
dispatcher = createDispatcher();
addIfService(dispatcher);
EventHandler<JobHistoryEvent> historyService = null;
if (copyHistory) {
historyService =
createJobHistoryHandler(context);
dispatcher.register(org.apache.hadoop.mapreduce.jobhistory.EventType.class,
historyService);
}
NoopEventHandler eater = new NoopEventHandler();
//We do not have a JobEventDispatcher in this path
dispatcher.register(JobEventType.class, eater);
// service to allocate containers from RM (if non-uber) or to fake it (uber)
containerAllocator = createContainerAllocator(null, context);
addIfService(containerAllocator);
dispatcher.register(ContainerAllocator.EventType.class, containerAllocator);
if (copyHistory) {
// Add the staging directory cleaner before the history server but after
// the container allocator so the staging directory is cleaned after
// the history has been flushed but before unregistering with the RM.
addService(createStagingDirCleaningService());
// Add the JobHistoryEventHandler last so that it is properly stopped first.
// This will guarantee that all history-events are flushed before AM goes
// ahead with shutdown.
// Note: Even though JobHistoryEventHandler is started last, if any
// component creates a JobHistoryEvent in the meanwhile, it will be just be
// queued inside the JobHistoryEventHandler
addIfService(historyService);
JobHistoryCopyService cpHist = new JobHistoryCopyService(appAttemptID,
dispatcher.getEventHandler());
addIfService(cpHist);
}
} else {
committer = createOutputCommitter(conf); committer = createOutputCommitter(conf);
boolean recoveryEnabled = conf.getBoolean( boolean recoveryEnabled = conf.getBoolean(
MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true); MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
@ -271,14 +359,14 @@ public class MRAppMaster extends CompositeService {
containerAllocator = createContainerAllocator(clientService, context); containerAllocator = createContainerAllocator(clientService, context);
//service to handle requests to TaskUmbilicalProtocol
taskAttemptListener = createTaskAttemptListener(context);
addIfService(taskAttemptListener);
//service to handle the output committer //service to handle the output committer
committerEventHandler = createCommitterEventHandler(context, committer); committerEventHandler = createCommitterEventHandler(context, committer);
addIfService(committerEventHandler); addIfService(committerEventHandler);
//service to handle requests to TaskUmbilicalProtocol
taskAttemptListener = createTaskAttemptListener(context);
addIfService(taskAttemptListener);
//service to log job history events //service to log job history events
EventHandler<JobHistoryEvent> historyService = EventHandler<JobHistoryEvent> historyService =
createJobHistoryHandler(context); createJobHistoryHandler(context);
@ -326,6 +414,7 @@ public class MRAppMaster extends CompositeService {
// component creates a JobHistoryEvent in the meanwhile, it will be just be // component creates a JobHistoryEvent in the meanwhile, it will be just be
// queued inside the JobHistoryEventHandler // queued inside the JobHistoryEventHandler
addIfService(historyService); addIfService(historyService);
}
super.init(conf); super.init(conf);
} // end of init() } // end of init()
@ -489,15 +578,20 @@ public class MRAppMaster extends CompositeService {
appContext.getClock(), getCommitter()); appContext.getClock(), getCommitter());
} }
/** Create and initialize (but don't start) a single job. */ /** Create and initialize (but don't start) a single job.
protected Job createJob(Configuration conf) { * @param forcedState a state to force the job into or null for normal operation.
* @param diagnostic a diagnostic message to include with the job.
*/
protected Job createJob(Configuration conf, JobStateInternal forcedState,
String diagnostic) {
// create single job // create single job
Job newJob = Job newJob =
new JobImpl(jobId, appAttemptID, conf, dispatcher.getEventHandler(), new JobImpl(jobId, appAttemptID, conf, dispatcher.getEventHandler(),
taskAttemptListener, jobTokenSecretManager, fsTokens, clock, taskAttemptListener, jobTokenSecretManager, fsTokens, clock,
completedTasksFromPreviousRun, metrics, newApiCommitter, completedTasksFromPreviousRun, metrics, newApiCommitter,
currentUser.getUserName(), appSubmitTime, amInfos, context); currentUser.getUserName(), appSubmitTime, amInfos, context,
forcedState, diagnostic);
((RunningAppContext) context).jobs.put(newJob.getID(), newJob); ((RunningAppContext) context).jobs.put(newJob.getID(), newJob);
dispatcher.register(JobFinishEvent.Type.class, dispatcher.register(JobFinishEvent.Type.class,
@ -874,7 +968,7 @@ public class MRAppMaster extends CompositeService {
amInfos.add(amInfo); amInfos.add(amInfo);
// /////////////////// Create the job itself. // /////////////////// Create the job itself.
job = createJob(getConfig()); job = createJob(getConfig(), forcedState, shutDownMessage);
// End of creating the job. // End of creating the job.
@ -891,6 +985,7 @@ public class MRAppMaster extends CompositeService {
// It's more test friendly to put it here. // It's more test friendly to put it here.
DefaultMetricsSystem.initialize("MRAppMaster"); DefaultMetricsSystem.initialize("MRAppMaster");
if (!errorHappenedShutDown) {
// create a job event for job intialization // create a job event for job intialization
JobEvent initJobEvent = new JobEvent(job.getID(), JobEventType.JOB_INIT); JobEvent initJobEvent = new JobEvent(job.getID(), JobEventType.JOB_INIT);
// Send init to the job (this does NOT trigger job execution) // Send init to the job (this does NOT trigger job execution)
@ -917,6 +1012,7 @@ public class MRAppMaster extends CompositeService {
LOG.info("MRAppMaster launching normal, non-uberized, multi-container " LOG.info("MRAppMaster launching normal, non-uberized, multi-container "
+ "job " + job.getID() + "."); + "job " + job.getID() + ".");
} }
}
//start all the components //start all the components
super.start(); super.start();
@ -1062,6 +1158,17 @@ public class MRAppMaster extends CompositeService {
} }
/**
* Eats events that are not needed in some error cases.
*/
private static class NoopEventHandler implements EventHandler<Event> {
@Override
public void handle(Event event) {
//Empty
}
}
private static void validateInputParam(String value, String param) private static void validateInputParam(String value, String param)
throws IOException { throws IOException {
if (value == null) { if (value == null) {
@ -1158,6 +1265,9 @@ public class MRAppMaster extends CompositeService {
public Object run() throws Exception { public Object run() throws Exception {
appMaster.init(conf); appMaster.init(conf);
appMaster.start(); appMaster.start();
if(appMaster.errorHappenedShutDown) {
throw new IOException("Was asked to shut down.");
}
return null; return null;
} }
}); });

View File

@ -29,8 +29,13 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.app.AppContext; import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobAbortCompletedEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.JobAbortCompletedEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobCommitCompletedEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.JobCommitCompletedEvent;
@ -40,6 +45,8 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.JobSetupFailedEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler; import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.EventHandler;
@ -64,6 +71,11 @@ public class CommitterEventHandler extends AbstractService
private Thread jobCommitThread = null; private Thread jobCommitThread = null;
private int commitThreadCancelTimeoutMs; private int commitThreadCancelTimeoutMs;
private long commitWindowMs; private long commitWindowMs;
private FileSystem fs;
private Path startCommitFile;
private Path endCommitSuccessFile;
private Path endCommitFailureFile;
public CommitterEventHandler(AppContext context, OutputCommitter committer, public CommitterEventHandler(AppContext context, OutputCommitter committer,
RMHeartbeatHandler rmHeartbeatHandler) { RMHeartbeatHandler rmHeartbeatHandler) {
@ -82,6 +94,17 @@ public class CommitterEventHandler extends AbstractService
MRJobConfig.DEFAULT_MR_AM_COMMITTER_CANCEL_TIMEOUT_MS); MRJobConfig.DEFAULT_MR_AM_COMMITTER_CANCEL_TIMEOUT_MS);
commitWindowMs = conf.getLong(MRJobConfig.MR_AM_COMMIT_WINDOW_MS, commitWindowMs = conf.getLong(MRJobConfig.MR_AM_COMMIT_WINDOW_MS,
MRJobConfig.DEFAULT_MR_AM_COMMIT_WINDOW_MS); MRJobConfig.DEFAULT_MR_AM_COMMIT_WINDOW_MS);
try {
fs = FileSystem.get(conf);
JobID id = TypeConverter.fromYarn(context.getApplicationID());
JobId jobId = TypeConverter.toYarn(id);
String user = UserGroupInformation.getCurrentUser().getShortUserName();
startCommitFile = MRApps.getStartJobCommitFile(conf, user, jobId);
endCommitSuccessFile = MRApps.getEndJobCommitSuccessFile(conf, user, jobId);
endCommitFailureFile = MRApps.getEndJobCommitFailureFile(conf, user, jobId);
} catch (IOException e) {
throw new YarnException(e);
}
} }
@Override @Override
@ -213,15 +236,26 @@ public class CommitterEventHandler extends AbstractService
} }
} }
private void touchz(Path p) throws IOException {
fs.create(p, false).close();
}
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
protected void handleJobCommit(CommitterJobCommitEvent event) { protected void handleJobCommit(CommitterJobCommitEvent event) {
try { try {
touchz(startCommitFile);
jobCommitStarted(); jobCommitStarted();
waitForValidCommitWindow(); waitForValidCommitWindow();
committer.commitJob(event.getJobContext()); committer.commitJob(event.getJobContext());
touchz(endCommitSuccessFile);
context.getEventHandler().handle( context.getEventHandler().handle(
new JobCommitCompletedEvent(event.getJobID())); new JobCommitCompletedEvent(event.getJobID()));
} catch (Exception e) { } catch (Exception e) {
try {
touchz(endCommitFailureFile);
} catch (Exception e2) {
LOG.error("could not create failure file.", e2);
}
LOG.error("Could not commit job", e); LOG.error("Could not commit job", e);
context.getEventHandler().handle( context.getEventHandler().handle(
new JobCommitFailedEvent(event.getJobID(), new JobCommitFailedEvent(event.getJobID(),

View File

@ -541,6 +541,8 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
private Token<JobTokenIdentifier> jobToken; private Token<JobTokenIdentifier> jobToken;
private JobTokenSecretManager jobTokenSecretManager; private JobTokenSecretManager jobTokenSecretManager;
private JobStateInternal forcedState = null;
public JobImpl(JobId jobId, ApplicationAttemptId applicationAttemptId, public JobImpl(JobId jobId, ApplicationAttemptId applicationAttemptId,
Configuration conf, EventHandler eventHandler, Configuration conf, EventHandler eventHandler,
TaskAttemptListener taskAttemptListener, TaskAttemptListener taskAttemptListener,
@ -548,7 +550,8 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
Credentials fsTokenCredentials, Clock clock, Credentials fsTokenCredentials, Clock clock,
Map<TaskId, TaskInfo> completedTasksFromPreviousRun, MRAppMetrics metrics, Map<TaskId, TaskInfo> completedTasksFromPreviousRun, MRAppMetrics metrics,
boolean newApiCommitter, String userName, boolean newApiCommitter, String userName,
long appSubmitTime, List<AMInfo> amInfos, AppContext appContext) { long appSubmitTime, List<AMInfo> amInfos, AppContext appContext,
JobStateInternal forcedState, String forcedDiagnostic) {
this.applicationAttemptId = applicationAttemptId; this.applicationAttemptId = applicationAttemptId;
this.jobId = jobId; this.jobId = jobId;
this.jobName = conf.get(JobContext.JOB_NAME, "<missing job name>"); this.jobName = conf.get(JobContext.JOB_NAME, "<missing job name>");
@ -579,6 +582,10 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
// This "this leak" is okay because the retained pointer is in an // This "this leak" is okay because the retained pointer is in an
// instance variable. // instance variable.
stateMachine = stateMachineFactory.make(this); stateMachine = stateMachineFactory.make(this);
this.forcedState = forcedState;
if(forcedDiagnostic != null) {
this.diagnostics.add(forcedDiagnostic);
}
} }
protected StateMachine<JobStateInternal, JobEventType, JobEvent> getStateMachine() { protected StateMachine<JobStateInternal, JobEventType, JobEvent> getStateMachine() {
@ -818,7 +825,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
public JobState getState() { public JobState getState() {
readLock.lock(); readLock.lock();
try { try {
return getExternalState(getStateMachine().getCurrentState()); return getExternalState(getInternalState());
} finally { } finally {
readLock.unlock(); readLock.unlock();
} }
@ -868,6 +875,9 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
public JobStateInternal getInternalState() { public JobStateInternal getInternalState() {
readLock.lock(); readLock.lock();
try { try {
if(forcedState != null) {
return forcedState;
}
return getStateMachine().getCurrentState(); return getStateMachine().getCurrentState();
} finally { } finally {
readLock.unlock(); readLock.unlock();

View File

@ -205,18 +205,18 @@ public class RecoveryService extends CompositeService implements Recovery {
throws IOException { throws IOException {
FSDataInputStream in = null; FSDataInputStream in = null;
Path historyFile = null; Path historyFile = null;
String jobName = String jobId =
TypeConverter.fromYarn(applicationAttemptId.getApplicationId()) TypeConverter.fromYarn(applicationAttemptId.getApplicationId())
.toString(); .toString();
String jobhistoryDir = String jobhistoryDir =
JobHistoryUtils.getConfiguredHistoryStagingDirPrefix(conf); JobHistoryUtils.getConfiguredHistoryStagingDirPrefix(conf, jobId);
Path histDirPath = Path histDirPath =
FileContext.getFileContext(conf).makeQualified(new Path(jobhistoryDir)); FileContext.getFileContext(conf).makeQualified(new Path(jobhistoryDir));
FileContext fc = FileContext.getFileContext(histDirPath.toUri(), conf); FileContext fc = FileContext.getFileContext(histDirPath.toUri(), conf);
// read the previous history file // read the previous history file
historyFile = historyFile =
fc.makeQualified(JobHistoryUtils.getStagingJobHistoryFile(histDirPath, fc.makeQualified(JobHistoryUtils.getStagingJobHistoryFile(histDirPath,
jobName, (applicationAttemptId.getAttemptId() - 1))); jobId, (applicationAttemptId.getAttemptId() - 1)));
LOG.info("History file is at " + historyFile); LOG.info("History file is at " + historyFile);
in = fc.open(historyFile); in = fc.open(historyFile);
return in; return in;

View File

@ -141,14 +141,19 @@ public abstract class RMCommunicator extends AbstractService
protected void register() { protected void register() {
//Register //Register
InetSocketAddress serviceAddr = clientService.getBindAddress(); InetSocketAddress serviceAddr = null;
if (clientService != null ) {
serviceAddr = clientService.getBindAddress();
}
try { try {
RegisterApplicationMasterRequest request = RegisterApplicationMasterRequest request =
recordFactory.newRecordInstance(RegisterApplicationMasterRequest.class); recordFactory.newRecordInstance(RegisterApplicationMasterRequest.class);
request.setApplicationAttemptId(applicationAttemptId); request.setApplicationAttemptId(applicationAttemptId);
if (serviceAddr != null) {
request.setHost(serviceAddr.getHostName()); request.setHost(serviceAddr.getHostName());
request.setRpcPort(serviceAddr.getPort()); request.setRpcPort(serviceAddr.getPort());
request.setTrackingUrl(serviceAddr.getHostName() + ":" + clientService.getHttpPort()); request.setTrackingUrl(serviceAddr.getHostName() + ":" + clientService.getHttpPort());
}
RegisterApplicationMasterResponse response = RegisterApplicationMasterResponse response =
scheduler.registerApplicationMaster(request); scheduler.registerApplicationMaster(request);
minContainerCapability = response.getMinimumResourceCapability(); minContainerCapability = response.getMinimumResourceCapability();

View File

@ -18,14 +18,9 @@
package org.apache.hadoop.mapreduce.jobhistory; package org.apache.hadoop.mapreduce.jobhistory;
import static junit.framework.Assert.assertFalse; import static junit.framework.Assert.*;
import static junit.framework.Assert.assertTrue; import static org.mockito.Matchers.*;
import static org.mockito.Matchers.any; import static org.mockito.Mockito.*;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
@ -51,7 +46,6 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.util.BuilderUtils; import org.apache.hadoop.yarn.util.BuilderUtils;
import org.junit.Test; import org.junit.Test;
import org.mockito.Mockito; import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;
public class TestJobHistoryEventHandler { public class TestJobHistoryEventHandler {
@ -260,13 +254,15 @@ public class TestJobHistoryEventHandler {
} }
} }
private AppContext mockAppContext(JobId jobId) { private AppContext mockAppContext(ApplicationId appId) {
JobId jobId = TypeConverter.toYarn(TypeConverter.fromYarn(appId));
AppContext mockContext = mock(AppContext.class); AppContext mockContext = mock(AppContext.class);
Job mockJob = mock(Job.class); Job mockJob = mock(Job.class);
when(mockJob.getTotalMaps()).thenReturn(10); when(mockJob.getTotalMaps()).thenReturn(10);
when(mockJob.getTotalReduces()).thenReturn(10); when(mockJob.getTotalReduces()).thenReturn(10);
when(mockJob.getName()).thenReturn("mockjob"); when(mockJob.getName()).thenReturn("mockjob");
when(mockContext.getJob(jobId)).thenReturn(mockJob); when(mockContext.getJob(jobId)).thenReturn(mockJob);
when(mockContext.getApplicationID()).thenReturn(appId);
return mockContext; return mockContext;
} }
@ -279,7 +275,7 @@ public class TestJobHistoryEventHandler {
ContainerId containerId = BuilderUtils.newContainerId(appAttemptId, 1); ContainerId containerId = BuilderUtils.newContainerId(appAttemptId, 1);
TaskID taskID = TaskID.forName("task_200707121733_0003_m_000005"); TaskID taskID = TaskID.forName("task_200707121733_0003_m_000005");
JobId jobId = MRBuilderUtils.newJobId(appId, 1); JobId jobId = MRBuilderUtils.newJobId(appId, 1);
AppContext mockAppContext = mockAppContext(jobId); AppContext mockAppContext = mockAppContext(appId);
} }
private JobHistoryEvent getEventToEnqueue(JobId jobId) { private JobHistoryEvent getEventToEnqueue(JobId jobId) {

View File

@ -30,6 +30,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.WrappedJvmID; import org.apache.hadoop.mapred.WrappedJvmID;
import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.JobContext;
@ -208,6 +209,16 @@ public class MRApp extends MRAppMaster {
@Override @Override
public void init(Configuration conf) { public void init(Configuration conf) {
try {
//Create the staging directory if it does not exist
String user = UserGroupInformation.getCurrentUser().getShortUserName();
Path stagingDir = MRApps.getStagingAreaDir(conf, user);
FileSystem fs = getFileSystem(conf);
fs.mkdirs(stagingDir);
} catch (Exception e) {
throw new YarnException("Error creating staging dir", e);
}
super.init(conf); super.init(conf);
if (this.clusterInfo != null) { if (this.clusterInfo != null) {
getContext().getClusterInfo().setMinContainerCapability( getContext().getClusterInfo().setMinContainerCapability(
@ -388,7 +399,8 @@ public class MRApp extends MRAppMaster {
} }
@Override @Override
protected Job createJob(Configuration conf) { protected Job createJob(Configuration conf, JobStateInternal forcedState,
String diagnostic) {
UserGroupInformation currentUser = null; UserGroupInformation currentUser = null;
try { try {
currentUser = UserGroupInformation.getCurrentUser(); currentUser = UserGroupInformation.getCurrentUser();
@ -398,7 +410,8 @@ public class MRApp extends MRAppMaster {
Job newJob = new TestJob(getJobId(), getAttemptID(), conf, Job newJob = new TestJob(getJobId(), getAttemptID(), conf,
getDispatcher().getEventHandler(), getDispatcher().getEventHandler(),
getTaskAttemptListener(), getContext().getClock(), getTaskAttemptListener(), getContext().getClock(),
isNewApiCommitter(), currentUser.getUserName(), getContext()); isNewApiCommitter(), currentUser.getUserName(), getContext(),
forcedState, diagnostic);
((AppContext) getContext()).getAllJobs().put(newJob.getID(), newJob); ((AppContext) getContext()).getAllJobs().put(newJob.getID(), newJob);
getDispatcher().register(JobFinishEvent.Type.class, getDispatcher().register(JobFinishEvent.Type.class,
@ -631,13 +644,14 @@ public class MRApp extends MRAppMaster {
public TestJob(JobId jobId, ApplicationAttemptId applicationAttemptId, public TestJob(JobId jobId, ApplicationAttemptId applicationAttemptId,
Configuration conf, EventHandler eventHandler, Configuration conf, EventHandler eventHandler,
TaskAttemptListener taskAttemptListener, Clock clock, TaskAttemptListener taskAttemptListener, Clock clock,
boolean newApiCommitter, String user, AppContext appContext) { boolean newApiCommitter, String user, AppContext appContext,
JobStateInternal forcedState, String diagnostic) {
super(jobId, getApplicationAttemptId(applicationId, getStartCount()), super(jobId, getApplicationAttemptId(applicationId, getStartCount()),
conf, eventHandler, taskAttemptListener, conf, eventHandler, taskAttemptListener,
new JobTokenSecretManager(), new Credentials(), clock, new JobTokenSecretManager(), new Credentials(), clock,
getCompletedTaskFromPreviousRun(), metrics, getCompletedTaskFromPreviousRun(), metrics,
newApiCommitter, user, System.currentTimeMillis(), getAllAMInfos(), newApiCommitter, user, System.currentTimeMillis(), getAllAMInfos(),
appContext); appContext, forcedState, diagnostic);
// This "this leak" is okay because the retained pointer is in an // This "this leak" is okay because the retained pointer is in an
// instance variable. // instance variable.

View File

@ -37,6 +37,7 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskState; import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
import org.apache.hadoop.mapreduce.v2.app.job.Job; import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
import org.apache.hadoop.mapreduce.v2.app.job.Task; import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt; import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobUpdatedNodesEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.JobUpdatedNodesEvent;
@ -370,8 +371,9 @@ public class TestMRApp {
} }
@Override @Override
protected Job createJob(Configuration conf) { protected Job createJob(Configuration conf, JobStateInternal forcedState,
spiedJob = spy((JobImpl) super.createJob(conf)); String diagnostic) {
spiedJob = spy((JobImpl) super.createJob(conf, forcedState, diagnostic));
((AppContext) getContext()).getAllJobs().put(spiedJob.getID(), spiedJob); ((AppContext) getContext()).getAllJobs().put(spiedJob.getID(), spiedJob);
return spiedJob; return spiedJob;
} }

View File

@ -17,28 +17,63 @@
*/ */
package org.apache.hadoop.mapreduce.v2.app; package org.apache.hadoop.mapreduce.v2.app;
import static org.junit.Assert.*;
import static org.mockito.Mockito.*;
import java.io.File;
import java.io.IOException; import java.io.IOException;
import junit.framework.Assert; import org.apache.commons.io.FileUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
import org.apache.hadoop.mapreduce.v2.util.MRApps; import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.ConverterUtils;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
public class TestMRAppMaster { public class TestMRAppMaster {
private static final Log LOG = LogFactory.getLog(TestMRAppMaster.class);
static String stagingDir = "staging/";
@BeforeClass
public static void setup() {
//Do not error out if metrics are inited multiple times
DefaultMetricsSystem.setMiniClusterMode(true);
File dir = new File(stagingDir);
stagingDir = dir.getAbsolutePath();
}
@Before
public void cleanup() throws IOException {
File dir = new File(stagingDir);
if(dir.exists()) {
FileUtils.deleteDirectory(dir);
}
dir.mkdirs();
}
@Test @Test
public void testMRAppMasterForDifferentUser() throws IOException, public void testMRAppMasterForDifferentUser() throws IOException,
InterruptedException { InterruptedException {
String applicationAttemptIdStr = "appattempt_1317529182569_0004_000001"; String applicationAttemptIdStr = "appattempt_1317529182569_0004_000001";
String containerIdStr = "container_1317529182569_0004_000001_1"; String containerIdStr = "container_1317529182569_0004_000001_1";
String stagingDir = "/tmp/staging";
String userName = "TestAppMasterUser"; String userName = "TestAppMasterUser";
ApplicationAttemptId applicationAttemptId = ConverterUtils ApplicationAttemptId applicationAttemptId = ConverterUtils
.toApplicationAttemptId(applicationAttemptIdStr); .toApplicationAttemptId(applicationAttemptIdStr);
@ -49,34 +84,208 @@ public class TestMRAppMaster {
YarnConfiguration conf = new YarnConfiguration(); YarnConfiguration conf = new YarnConfiguration();
conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir); conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
MRAppMaster.initAndStartAppMaster(appMaster, conf, userName); MRAppMaster.initAndStartAppMaster(appMaster, conf, userName);
Assert.assertEquals(stagingDir + Path.SEPARATOR + userName + Path.SEPARATOR assertEquals(stagingDir + Path.SEPARATOR + userName + Path.SEPARATOR
+ ".staging", appMaster.stagingDirPath.toString()); + ".staging", appMaster.stagingDirPath.toString());
} }
@Test
public void testMRAppMasterMidLock() throws IOException,
InterruptedException {
String applicationAttemptIdStr = "appattempt_1317529182569_0004_000002";
String containerIdStr = "container_1317529182569_0004_000002_1";
String userName = "TestAppMasterUser";
YarnConfiguration conf = new YarnConfiguration();
conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
ApplicationAttemptId applicationAttemptId = ConverterUtils
.toApplicationAttemptId(applicationAttemptIdStr);
JobId jobId = TypeConverter.toYarn(
TypeConverter.fromYarn(applicationAttemptId.getApplicationId()));
Path start = MRApps.getStartJobCommitFile(conf, userName, jobId);
FileSystem fs = FileSystem.get(conf);
//Create the file, but no end file so we should unregister with an error.
fs.create(start).close();
ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
MRAppMaster appMaster =
new MRAppMasterTest(applicationAttemptId, containerId, "host", -1, -1,
System.currentTimeMillis(), false);
boolean caught = false;
try {
MRAppMaster.initAndStartAppMaster(appMaster, conf, userName);
} catch (IOException e) {
//The IO Exception is expected
LOG.info("Caught expected Exception", e);
caught = true;
}
assertTrue(caught);
assertTrue(appMaster.errorHappenedShutDown);
assertEquals(JobStateInternal.ERROR, appMaster.forcedState);
appMaster.stop();
}
@Test
public void testMRAppMasterSuccessLock() throws IOException,
InterruptedException {
String applicationAttemptIdStr = "appattempt_1317529182569_0004_000002";
String containerIdStr = "container_1317529182569_0004_000002_1";
String userName = "TestAppMasterUser";
YarnConfiguration conf = new YarnConfiguration();
conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
ApplicationAttemptId applicationAttemptId = ConverterUtils
.toApplicationAttemptId(applicationAttemptIdStr);
JobId jobId = TypeConverter.toYarn(
TypeConverter.fromYarn(applicationAttemptId.getApplicationId()));
Path start = MRApps.getStartJobCommitFile(conf, userName, jobId);
Path end = MRApps.getEndJobCommitSuccessFile(conf, userName, jobId);
FileSystem fs = FileSystem.get(conf);
fs.create(start).close();
fs.create(end).close();
ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
MRAppMaster appMaster =
new MRAppMasterTest(applicationAttemptId, containerId, "host", -1, -1,
System.currentTimeMillis(), false);
boolean caught = false;
try {
MRAppMaster.initAndStartAppMaster(appMaster, conf, userName);
} catch (IOException e) {
//The IO Exception is expected
LOG.info("Caught expected Exception", e);
caught = true;
}
assertTrue(caught);
assertTrue(appMaster.errorHappenedShutDown);
assertEquals(JobStateInternal.SUCCEEDED, appMaster.forcedState);
appMaster.stop();
}
@Test
public void testMRAppMasterFailLock() throws IOException,
InterruptedException {
String applicationAttemptIdStr = "appattempt_1317529182569_0004_000002";
String containerIdStr = "container_1317529182569_0004_000002_1";
String userName = "TestAppMasterUser";
YarnConfiguration conf = new YarnConfiguration();
conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
ApplicationAttemptId applicationAttemptId = ConverterUtils
.toApplicationAttemptId(applicationAttemptIdStr);
JobId jobId = TypeConverter.toYarn(
TypeConverter.fromYarn(applicationAttemptId.getApplicationId()));
Path start = MRApps.getStartJobCommitFile(conf, userName, jobId);
Path end = MRApps.getEndJobCommitFailureFile(conf, userName, jobId);
FileSystem fs = FileSystem.get(conf);
fs.create(start).close();
fs.create(end).close();
ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
MRAppMaster appMaster =
new MRAppMasterTest(applicationAttemptId, containerId, "host", -1, -1,
System.currentTimeMillis(), false);
boolean caught = false;
try {
MRAppMaster.initAndStartAppMaster(appMaster, conf, userName);
} catch (IOException e) {
//The IO Exception is expected
LOG.info("Caught expected Exception", e);
caught = true;
}
assertTrue(caught);
assertTrue(appMaster.errorHappenedShutDown);
assertEquals(JobStateInternal.FAILED, appMaster.forcedState);
appMaster.stop();
}
@Test
public void testMRAppMasterMissingStaging() throws IOException,
InterruptedException {
String applicationAttemptIdStr = "appattempt_1317529182569_0004_000002";
String containerIdStr = "container_1317529182569_0004_000002_1";
String userName = "TestAppMasterUser";
YarnConfiguration conf = new YarnConfiguration();
conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
ApplicationAttemptId applicationAttemptId = ConverterUtils
.toApplicationAttemptId(applicationAttemptIdStr);
//Delete the staging directory
File dir = new File(stagingDir);
if(dir.exists()) {
FileUtils.deleteDirectory(dir);
}
ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
MRAppMaster appMaster =
new MRAppMasterTest(applicationAttemptId, containerId, "host", -1, -1,
System.currentTimeMillis(), false);
boolean caught = false;
try {
MRAppMaster.initAndStartAppMaster(appMaster, conf, userName);
} catch (IOException e) {
//The IO Exception is expected
LOG.info("Caught expected Exception", e);
caught = true;
}
assertTrue(caught);
assertTrue(appMaster.errorHappenedShutDown);
//Copying the history file is disabled, but it is not really visible from
//here
assertEquals(JobStateInternal.ERROR, appMaster.forcedState);
appMaster.stop();
}
} }
class MRAppMasterTest extends MRAppMaster { class MRAppMasterTest extends MRAppMaster {
Path stagingDirPath; Path stagingDirPath;
private Configuration conf; private Configuration conf;
private boolean overrideInitAndStart;
ContainerAllocator mockContainerAllocator;
public MRAppMasterTest(ApplicationAttemptId applicationAttemptId, public MRAppMasterTest(ApplicationAttemptId applicationAttemptId,
ContainerId containerId, String host, int port, int httpPort, ContainerId containerId, String host, int port, int httpPort,
long submitTime) { long submitTime) {
this(applicationAttemptId, containerId, host, port, httpPort, submitTime,
true);
}
public MRAppMasterTest(ApplicationAttemptId applicationAttemptId,
ContainerId containerId, String host, int port, int httpPort,
long submitTime, boolean overrideInitAndStart) {
super(applicationAttemptId, containerId, host, port, httpPort, submitTime); super(applicationAttemptId, containerId, host, port, httpPort, submitTime);
this.overrideInitAndStart = overrideInitAndStart;
mockContainerAllocator = mock(ContainerAllocator.class);
} }
@Override @Override
public void init(Configuration conf) { public void init(Configuration conf) {
if (overrideInitAndStart) {
this.conf = conf; this.conf = conf;
} else {
super.init(conf);
}
}
@Override
protected void downloadTokensAndSetupUGI(Configuration conf) {
try {
this.currentUser = UserGroupInformation.getCurrentUser();
} catch (IOException e) {
throw new YarnException(e);
}
}
@Override
protected ContainerAllocator createContainerAllocator(
final ClientService clientService, final AppContext context) {
return mockContainerAllocator;
} }
@Override @Override
public void start() { public void start() {
if (overrideInitAndStart) {
try { try {
String user = UserGroupInformation.getCurrentUser().getShortUserName(); String user = UserGroupInformation.getCurrentUser().getShortUserName();
stagingDirPath = MRApps.getStagingAreaDir(conf, user); stagingDirPath = MRApps.getStagingAreaDir(conf, user);
} catch (Exception e) { } catch (Exception e) {
Assert.fail(e.getMessage()); fail(e.getMessage());
}
} else {
super.start();
} }
} }

View File

@ -38,11 +38,13 @@ import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobState; import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.app.client.ClientService; import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
import org.apache.hadoop.mapreduce.v2.app.job.Job; import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobFinishEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.JobFinishEvent;
import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl; import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator; import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent; import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler; import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@ -72,6 +74,10 @@ import org.junit.Test;
conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, stagingJobDir); conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, stagingJobDir);
fs = mock(FileSystem.class); fs = mock(FileSystem.class);
when(fs.delete(any(Path.class), anyBoolean())).thenReturn(true); when(fs.delete(any(Path.class), anyBoolean())).thenReturn(true);
//Staging Dir exists
String user = UserGroupInformation.getCurrentUser().getShortUserName();
Path stagingDir = MRApps.getStagingAreaDir(conf, user);
when(fs.exists(stagingDir)).thenReturn(true);
ApplicationAttemptId attemptId = recordFactory.newRecordInstance( ApplicationAttemptId attemptId = recordFactory.newRecordInstance(
ApplicationAttemptId.class); ApplicationAttemptId.class);
attemptId.setAttemptId(0); attemptId.setAttemptId(0);
@ -93,6 +99,10 @@ import org.junit.Test;
conf.setInt(YarnConfiguration.RM_AM_MAX_RETRIES, 4); conf.setInt(YarnConfiguration.RM_AM_MAX_RETRIES, 4);
fs = mock(FileSystem.class); fs = mock(FileSystem.class);
when(fs.delete(any(Path.class), anyBoolean())).thenReturn(true); when(fs.delete(any(Path.class), anyBoolean())).thenReturn(true);
//Staging Dir exists
String user = UserGroupInformation.getCurrentUser().getShortUserName();
Path stagingDir = MRApps.getStagingAreaDir(conf, user);
when(fs.exists(stagingDir)).thenReturn(true);
ApplicationAttemptId attemptId = recordFactory.newRecordInstance( ApplicationAttemptId attemptId = recordFactory.newRecordInstance(
ApplicationAttemptId.class); ApplicationAttemptId.class);
attemptId.setAttemptId(0); attemptId.setAttemptId(0);
@ -118,6 +128,10 @@ import org.junit.Test;
conf.setInt(YarnConfiguration.RM_AM_MAX_RETRIES, 1); conf.setInt(YarnConfiguration.RM_AM_MAX_RETRIES, 1);
fs = mock(FileSystem.class); fs = mock(FileSystem.class);
when(fs.delete(any(Path.class), anyBoolean())).thenReturn(true); when(fs.delete(any(Path.class), anyBoolean())).thenReturn(true);
//Staging Dir exists
String user = UserGroupInformation.getCurrentUser().getShortUserName();
Path stagingDir = MRApps.getStagingAreaDir(conf, user);
when(fs.exists(stagingDir)).thenReturn(true);
ApplicationAttemptId attemptId = recordFactory.newRecordInstance( ApplicationAttemptId attemptId = recordFactory.newRecordInstance(
ApplicationAttemptId.class); ApplicationAttemptId.class);
attemptId.setAttemptId(1); attemptId.setAttemptId(1);
@ -198,7 +212,8 @@ import org.junit.Test;
} }
@Override @Override
protected Job createJob(Configuration conf) { protected Job createJob(Configuration conf, JobStateInternal forcedState,
String diagnostic) {
UserGroupInformation currentUser = null; UserGroupInformation currentUser = null;
try { try {
currentUser = UserGroupInformation.getCurrentUser(); currentUser = UserGroupInformation.getCurrentUser();
@ -208,7 +223,8 @@ import org.junit.Test;
Job newJob = new TestJob(getJobId(), getAttemptID(), conf, Job newJob = new TestJob(getJobId(), getAttemptID(), conf,
getDispatcher().getEventHandler(), getDispatcher().getEventHandler(),
getTaskAttemptListener(), getContext().getClock(), getTaskAttemptListener(), getContext().getClock(),
isNewApiCommitter(), currentUser.getUserName(), getContext()); isNewApiCommitter(), currentUser.getUserName(), getContext(),
forcedState, diagnostic);
((AppContext) getContext()).getAllJobs().put(newJob.getID(), newJob); ((AppContext) getContext()).getAllJobs().put(newJob.getID(), newJob);
getDispatcher().register(JobFinishEvent.Type.class, getDispatcher().register(JobFinishEvent.Type.class,

View File

@ -36,16 +36,85 @@ import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler; import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.SystemClock; import org.apache.hadoop.yarn.SystemClock;
import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.EventHandler;
import static org.junit.Assert.*;
import static org.mockito.Mockito.*;
import java.io.File;
import java.io.IOException;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobCommitCompletedEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobCommitFailedEvent;
import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
public class TestCommitterEventHandler { public class TestCommitterEventHandler {
public static class WaitForItHandler implements EventHandler {
private Event event = null;
@Override
public synchronized void handle(Event event) {
this.event = event;
notifyAll();
}
public synchronized Event getAndClearEvent() throws InterruptedException {
if (event == null) {
//Wait for at most 10 ms
wait(100);
}
Event e = event;
event = null;
return e;
}
}
static String stagingDir = "target/test-staging/";
@BeforeClass
public static void setup() {
File dir = new File(stagingDir);
stagingDir = dir.getAbsolutePath();
}
@Before
public void cleanup() throws IOException {
File dir = new File(stagingDir);
if(dir.exists()) {
FileUtils.deleteDirectory(dir);
}
dir.mkdirs();
}
@Test @Test
public void testCommitWindow() throws Exception { public void testCommitWindow() throws Exception {
Configuration conf = new Configuration(); Configuration conf = new Configuration();
conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
AsyncDispatcher dispatcher = new AsyncDispatcher(); AsyncDispatcher dispatcher = new AsyncDispatcher();
dispatcher.init(conf); dispatcher.init(conf);
dispatcher.start(); dispatcher.start();
@ -55,6 +124,10 @@ public class TestCommitterEventHandler {
SystemClock clock = new SystemClock(); SystemClock clock = new SystemClock();
AppContext appContext = mock(AppContext.class); AppContext appContext = mock(AppContext.class);
ApplicationAttemptId attemptid =
ConverterUtils.toApplicationAttemptId("appattempt_1234567890000_0001_0");
when(appContext.getApplicationID()).thenReturn(attemptid.getApplicationId());
when(appContext.getApplicationAttemptId()).thenReturn(attemptid);
when(appContext.getEventHandler()).thenReturn( when(appContext.getEventHandler()).thenReturn(
dispatcher.getEventHandler()); dispatcher.getEventHandler());
when(appContext.getClock()).thenReturn(clock); when(appContext.getClock()).thenReturn(clock);
@ -91,6 +164,9 @@ public class TestCommitterEventHandler {
1, jeh.numCommitCompletedEvents); 1, jeh.numCommitCompletedEvents);
verify(committer, times(1)).commitJob(any(JobContext.class)); verify(committer, times(1)).commitJob(any(JobContext.class));
//Clean up so we can try to commit again (Don't do this at home)
cleanup();
// try to commit again and verify it goes through since the heartbeat // try to commit again and verify it goes through since the heartbeat
// is still fresh // is still fresh
ceh.handle(new CommitterJobCommitEvent(null, null)); ceh.handle(new CommitterJobCommitEvent(null, null));
@ -147,4 +223,103 @@ public class TestCommitterEventHandler {
} }
} }
} }
@Test
public void testBasic() throws Exception {
AppContext mockContext = mock(AppContext.class);
OutputCommitter mockCommitter = mock(OutputCommitter.class);
Clock mockClock = mock(Clock.class);
CommitterEventHandler handler = new CommitterEventHandler(mockContext,
mockCommitter, new TestingRMHeartbeatHandler());
YarnConfiguration conf = new YarnConfiguration();
conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
JobContext mockJobContext = mock(JobContext.class);
ApplicationAttemptId attemptid =
ConverterUtils.toApplicationAttemptId("appattempt_1234567890000_0001_0");
JobId jobId = TypeConverter.toYarn(
TypeConverter.fromYarn(attemptid.getApplicationId()));
WaitForItHandler waitForItHandler = new WaitForItHandler();
when(mockContext.getApplicationID()).thenReturn(attemptid.getApplicationId());
when(mockContext.getApplicationAttemptId()).thenReturn(attemptid);
when(mockContext.getEventHandler()).thenReturn(waitForItHandler);
when(mockContext.getClock()).thenReturn(mockClock);
handler.init(conf);
handler.start();
try {
handler.handle(new CommitterJobCommitEvent(jobId, mockJobContext));
String user = UserGroupInformation.getCurrentUser().getShortUserName();
Path startCommitFile = MRApps.getStartJobCommitFile(conf, user, jobId);
Path endCommitSuccessFile = MRApps.getEndJobCommitSuccessFile(conf, user,
jobId);
Path endCommitFailureFile = MRApps.getEndJobCommitFailureFile(conf, user,
jobId);
Event e = waitForItHandler.getAndClearEvent();
assertNotNull(e);
assertTrue(e instanceof JobCommitCompletedEvent);
FileSystem fs = FileSystem.get(conf);
assertTrue(startCommitFile.toString(), fs.exists(startCommitFile));
assertTrue(endCommitSuccessFile.toString(), fs.exists(endCommitSuccessFile));
assertFalse(endCommitFailureFile.toString(), fs.exists(endCommitFailureFile));
verify(mockCommitter).commitJob(any(JobContext.class));
} finally {
handler.stop();
}
}
@Test
public void testFailure() throws Exception {
AppContext mockContext = mock(AppContext.class);
OutputCommitter mockCommitter = mock(OutputCommitter.class);
Clock mockClock = mock(Clock.class);
CommitterEventHandler handler = new CommitterEventHandler(mockContext,
mockCommitter, new TestingRMHeartbeatHandler());
YarnConfiguration conf = new YarnConfiguration();
conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
JobContext mockJobContext = mock(JobContext.class);
ApplicationAttemptId attemptid =
ConverterUtils.toApplicationAttemptId("appattempt_1234567890000_0001_0");
JobId jobId = TypeConverter.toYarn(
TypeConverter.fromYarn(attemptid.getApplicationId()));
WaitForItHandler waitForItHandler = new WaitForItHandler();
when(mockContext.getApplicationID()).thenReturn(attemptid.getApplicationId());
when(mockContext.getApplicationAttemptId()).thenReturn(attemptid);
when(mockContext.getEventHandler()).thenReturn(waitForItHandler);
when(mockContext.getClock()).thenReturn(mockClock);
doThrow(new YarnException("Intentional Failure")).when(mockCommitter)
.commitJob(any(JobContext.class));
handler.init(conf);
handler.start();
try {
handler.handle(new CommitterJobCommitEvent(jobId, mockJobContext));
String user = UserGroupInformation.getCurrentUser().getShortUserName();
Path startCommitFile = MRApps.getStartJobCommitFile(conf, user, jobId);
Path endCommitSuccessFile = MRApps.getEndJobCommitSuccessFile(conf, user,
jobId);
Path endCommitFailureFile = MRApps.getEndJobCommitFailureFile(conf, user,
jobId);
Event e = waitForItHandler.getAndClearEvent();
assertNotNull(e);
assertTrue(e instanceof JobCommitFailedEvent);
FileSystem fs = FileSystem.get(conf);
assertTrue(fs.exists(startCommitFile));
assertFalse(fs.exists(endCommitSuccessFile));
assertTrue(fs.exists(endCommitFailureFile));
verify(mockCommitter).commitJob(any(JobContext.class));
} finally {
handler.stop();
}
}
} }

View File

@ -23,11 +23,13 @@ import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier; import java.util.concurrent.CyclicBarrier;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.JobACL; import org.apache.hadoop.mapreduce.JobACL;
import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.JobContext;
@ -67,8 +69,11 @@ import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.state.StateMachine; import org.apache.hadoop.yarn.state.StateMachine;
import org.apache.hadoop.yarn.state.StateMachineFactory; import org.apache.hadoop.yarn.state.StateMachineFactory;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.Records;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
@ -78,10 +83,28 @@ import org.junit.Test;
@SuppressWarnings({"rawtypes"}) @SuppressWarnings({"rawtypes"})
public class TestJobImpl { public class TestJobImpl {
static String stagingDir = "target/test-staging/";
@BeforeClass
public static void setup() {
File dir = new File(stagingDir);
stagingDir = dir.getAbsolutePath();
}
@Before
public void cleanup() throws IOException {
File dir = new File(stagingDir);
if(dir.exists()) {
FileUtils.deleteDirectory(dir);
}
dir.mkdirs();
}
@Test @Test
public void testJobNoTasks() { public void testJobNoTasks() {
Configuration conf = new Configuration(); Configuration conf = new Configuration();
conf.setInt(MRJobConfig.NUM_REDUCES, 0); conf.setInt(MRJobConfig.NUM_REDUCES, 0);
conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
AsyncDispatcher dispatcher = new AsyncDispatcher(); AsyncDispatcher dispatcher = new AsyncDispatcher();
dispatcher.init(conf); dispatcher.init(conf);
dispatcher.start(); dispatcher.start();
@ -103,6 +126,7 @@ public class TestJobImpl {
@Test(timeout=20000) @Test(timeout=20000)
public void testCommitJobFailsJob() throws Exception { public void testCommitJobFailsJob() throws Exception {
Configuration conf = new Configuration(); Configuration conf = new Configuration();
conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
AsyncDispatcher dispatcher = new AsyncDispatcher(); AsyncDispatcher dispatcher = new AsyncDispatcher();
dispatcher.init(conf); dispatcher.init(conf);
dispatcher.start(); dispatcher.start();
@ -127,6 +151,7 @@ public class TestJobImpl {
@Test(timeout=20000) @Test(timeout=20000)
public void testCheckJobCompleteSuccess() throws Exception { public void testCheckJobCompleteSuccess() throws Exception {
Configuration conf = new Configuration(); Configuration conf = new Configuration();
conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
AsyncDispatcher dispatcher = new AsyncDispatcher(); AsyncDispatcher dispatcher = new AsyncDispatcher();
dispatcher.init(conf); dispatcher.init(conf);
dispatcher.start(); dispatcher.start();
@ -151,6 +176,7 @@ public class TestJobImpl {
@Test(timeout=20000) @Test(timeout=20000)
public void testKilledDuringSetup() throws Exception { public void testKilledDuringSetup() throws Exception {
Configuration conf = new Configuration(); Configuration conf = new Configuration();
conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
AsyncDispatcher dispatcher = new AsyncDispatcher(); AsyncDispatcher dispatcher = new AsyncDispatcher();
dispatcher.init(conf); dispatcher.init(conf);
dispatcher.start(); dispatcher.start();
@ -187,6 +213,7 @@ public class TestJobImpl {
@Test(timeout=20000) @Test(timeout=20000)
public void testKilledDuringCommit() throws Exception { public void testKilledDuringCommit() throws Exception {
Configuration conf = new Configuration(); Configuration conf = new Configuration();
conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
AsyncDispatcher dispatcher = new AsyncDispatcher(); AsyncDispatcher dispatcher = new AsyncDispatcher();
dispatcher.init(conf); dispatcher.init(conf);
dispatcher.start(); dispatcher.start();
@ -211,6 +238,7 @@ public class TestJobImpl {
@Test(timeout=20000) @Test(timeout=20000)
public void testKilledDuringFailAbort() throws Exception { public void testKilledDuringFailAbort() throws Exception {
Configuration conf = new Configuration(); Configuration conf = new Configuration();
conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
AsyncDispatcher dispatcher = new AsyncDispatcher(); AsyncDispatcher dispatcher = new AsyncDispatcher();
dispatcher.init(conf); dispatcher.init(conf);
dispatcher.start(); dispatcher.start();
@ -252,6 +280,7 @@ public class TestJobImpl {
@Test(timeout=20000) @Test(timeout=20000)
public void testKilledDuringKillAbort() throws Exception { public void testKilledDuringKillAbort() throws Exception {
Configuration conf = new Configuration(); Configuration conf = new Configuration();
conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
AsyncDispatcher dispatcher = new AsyncDispatcher(); AsyncDispatcher dispatcher = new AsyncDispatcher();
dispatcher.init(conf); dispatcher.init(conf);
dispatcher.start(); dispatcher.start();
@ -316,7 +345,7 @@ public class TestJobImpl {
// Verify access // Verify access
JobImpl job1 = new JobImpl(jobId, null, conf1, null, null, null, null, null, JobImpl job1 = new JobImpl(jobId, null, conf1, null, null, null, null, null,
null, null, true, null, 0, null, null); null, null, true, null, 0, null, null, null, null);
Assert.assertTrue(job1.checkAccess(ugi1, JobACL.VIEW_JOB)); Assert.assertTrue(job1.checkAccess(ugi1, JobACL.VIEW_JOB));
Assert.assertFalse(job1.checkAccess(ugi2, JobACL.VIEW_JOB)); Assert.assertFalse(job1.checkAccess(ugi2, JobACL.VIEW_JOB));
@ -327,7 +356,7 @@ public class TestJobImpl {
// Verify access // Verify access
JobImpl job2 = new JobImpl(jobId, null, conf2, null, null, null, null, null, JobImpl job2 = new JobImpl(jobId, null, conf2, null, null, null, null, null,
null, null, true, null, 0, null, null); null, null, true, null, 0, null, null, null, null);
Assert.assertTrue(job2.checkAccess(ugi1, JobACL.VIEW_JOB)); Assert.assertTrue(job2.checkAccess(ugi1, JobACL.VIEW_JOB));
Assert.assertTrue(job2.checkAccess(ugi2, JobACL.VIEW_JOB)); Assert.assertTrue(job2.checkAccess(ugi2, JobACL.VIEW_JOB));
@ -338,7 +367,7 @@ public class TestJobImpl {
// Verify access // Verify access
JobImpl job3 = new JobImpl(jobId, null, conf3, null, null, null, null, null, JobImpl job3 = new JobImpl(jobId, null, conf3, null, null, null, null, null,
null, null, true, null, 0, null, null); null, null, true, null, 0, null, null, null, null);
Assert.assertTrue(job3.checkAccess(ugi1, JobACL.VIEW_JOB)); Assert.assertTrue(job3.checkAccess(ugi1, JobACL.VIEW_JOB));
Assert.assertTrue(job3.checkAccess(ugi2, JobACL.VIEW_JOB)); Assert.assertTrue(job3.checkAccess(ugi2, JobACL.VIEW_JOB));
@ -349,7 +378,7 @@ public class TestJobImpl {
// Verify access // Verify access
JobImpl job4 = new JobImpl(jobId, null, conf4, null, null, null, null, null, JobImpl job4 = new JobImpl(jobId, null, conf4, null, null, null, null, null,
null, null, true, null, 0, null, null); null, null, true, null, 0, null, null, null, null);
Assert.assertTrue(job4.checkAccess(ugi1, JobACL.VIEW_JOB)); Assert.assertTrue(job4.checkAccess(ugi1, JobACL.VIEW_JOB));
Assert.assertTrue(job4.checkAccess(ugi2, JobACL.VIEW_JOB)); Assert.assertTrue(job4.checkAccess(ugi2, JobACL.VIEW_JOB));
@ -360,7 +389,7 @@ public class TestJobImpl {
// Verify access // Verify access
JobImpl job5 = new JobImpl(jobId, null, conf5, null, null, null, null, null, JobImpl job5 = new JobImpl(jobId, null, conf5, null, null, null, null, null,
null, null, true, null, 0, null, null); null, null, true, null, 0, null, null, null, null);
Assert.assertTrue(job5.checkAccess(ugi1, null)); Assert.assertTrue(job5.checkAccess(ugi1, null));
Assert.assertTrue(job5.checkAccess(ugi2, null)); Assert.assertTrue(job5.checkAccess(ugi2, null));
} }
@ -378,7 +407,7 @@ public class TestJobImpl {
mock(EventHandler.class), mock(EventHandler.class),
null, mock(JobTokenSecretManager.class), null, null, mock(JobTokenSecretManager.class), null,
new SystemClock(), null, new SystemClock(), null,
mrAppMetrics, true, null, 0, null, null); mrAppMetrics, true, null, 0, null, null, null, null);
job.handle(diagUpdateEvent); job.handle(diagUpdateEvent);
String diagnostics = job.getReport().getDiagnostics(); String diagnostics = job.getReport().getDiagnostics();
Assert.assertNotNull(diagnostics); Assert.assertNotNull(diagnostics);
@ -389,7 +418,7 @@ public class TestJobImpl {
mock(EventHandler.class), mock(EventHandler.class),
null, mock(JobTokenSecretManager.class), null, null, mock(JobTokenSecretManager.class), null,
new SystemClock(), null, new SystemClock(), null,
mrAppMetrics, true, null, 0, null, null); mrAppMetrics, true, null, 0, null, null, null, null);
job.handle(new JobEvent(jobId, JobEventType.JOB_KILL)); job.handle(new JobEvent(jobId, JobEventType.JOB_KILL));
job.handle(diagUpdateEvent); job.handle(diagUpdateEvent);
diagnostics = job.getReport().getDiagnostics(); diagnostics = job.getReport().getDiagnostics();
@ -444,7 +473,7 @@ public class TestJobImpl {
JobImpl job = new JobImpl(jobId, Records JobImpl job = new JobImpl(jobId, Records
.newRecord(ApplicationAttemptId.class), conf, mock(EventHandler.class), .newRecord(ApplicationAttemptId.class), conf, mock(EventHandler.class),
null, mock(JobTokenSecretManager.class), null, null, null, null, mock(JobTokenSecretManager.class), null, null, null,
mrAppMetrics, true, null, 0, null, null); mrAppMetrics, true, null, 0, null, null, null, null);
InitTransition initTransition = getInitTransition(2); InitTransition initTransition = getInitTransition(2);
JobEvent mockJobEvent = mock(JobEvent.class); JobEvent mockJobEvent = mock(JobEvent.class);
initTransition.transition(job, mockJobEvent); initTransition.transition(job, mockJobEvent);
@ -518,6 +547,10 @@ public class TestJobImpl {
callback.run(); callback.run();
} }
}; };
ApplicationAttemptId id =
ConverterUtils.toApplicationAttemptId("appattempt_1234567890000_0001_0");
when(appContext.getApplicationID()).thenReturn(id.getApplicationId());
when(appContext.getApplicationAttemptId()).thenReturn(id);
CommitterEventHandler handler = CommitterEventHandler handler =
new CommitterEventHandler(appContext, committer, heartbeatHandler); new CommitterEventHandler(appContext, committer, heartbeatHandler);
dispatcher.register(CommitterEventType.class, handler); dispatcher.register(CommitterEventType.class, handler);
@ -601,7 +634,8 @@ public class TestJobImpl {
super(jobId, applicationAttemptId, conf, eventHandler, super(jobId, applicationAttemptId, conf, eventHandler,
null, new JobTokenSecretManager(), new Credentials(), null, new JobTokenSecretManager(), new Credentials(),
new SystemClock(), null, MRAppMetrics.create(), new SystemClock(), null, MRAppMetrics.create(),
newApiCommitter, user, System.currentTimeMillis(), null, null); newApiCommitter, user, System.currentTimeMillis(), null, null, null,
null);
initTransition = getInitTransition(numSplits); initTransition = getInitTransition(numSplits);
localFactory = stateMachineFactory.addTransition(JobStateInternal.NEW, localFactory = stateMachineFactory.addTransition(JobStateInternal.NEW,

View File

@ -182,14 +182,16 @@ public class JobHistoryUtils {
/** /**
* Gets the configured directory prefix for In Progress history files. * Gets the configured directory prefix for In Progress history files.
* @param conf * @param conf the configuration for hte job
* @param jobId the id of the job the history file is for.
* @return A string representation of the prefix. * @return A string representation of the prefix.
*/ */
public static String public static String
getConfiguredHistoryStagingDirPrefix(Configuration conf) getConfiguredHistoryStagingDirPrefix(Configuration conf, String jobId)
throws IOException { throws IOException {
String user = UserGroupInformation.getCurrentUser().getShortUserName(); String user = UserGroupInformation.getCurrentUser().getShortUserName();
Path path = MRApps.getStagingAreaDir(conf, user); Path stagingPath = MRApps.getStagingAreaDir(conf, user);
Path path = new Path(stagingPath, jobId);
String logDir = path.toString(); String logDir = path.toString();
return logDir; return logDir;
} }

View File

@ -255,7 +255,26 @@ public class MRApps extends Apps {
return jobFile.toString(); return jobFile.toString();
} }
public static Path getEndJobCommitSuccessFile(Configuration conf, String user,
JobId jobId) {
Path endCommitFile = new Path(MRApps.getStagingAreaDir(conf, user),
jobId.toString() + Path.SEPARATOR + "COMMIT_SUCCESS");
return endCommitFile;
}
public static Path getEndJobCommitFailureFile(Configuration conf, String user,
JobId jobId) {
Path endCommitFile = new Path(MRApps.getStagingAreaDir(conf, user),
jobId.toString() + Path.SEPARATOR + "COMMIT_FAIL");
return endCommitFile;
}
public static Path getStartJobCommitFile(Configuration conf, String user,
JobId jobId) {
Path startCommitFile = new Path(MRApps.getStagingAreaDir(conf, user),
jobId.toString() + Path.SEPARATOR + "COMMIT_STARTED");
return startCommitFile;
}
private static long[] parseTimeStamps(String[] strs) { private static long[] parseTimeStamps(String[] strs) {
if (null == strs) { if (null == strs) {

View File

@ -52,6 +52,14 @@ import org.apache.hadoop.classification.InterfaceStability;
* Discard the task commit. * Discard the task commit.
* </li> * </li>
* </ol> * </ol>
* The methods in this class can be called from several different processes and
* from several different contexts. It is important to know which process and
* which context each is called from. Each method should be marked accordingly
* in its documentation. It is also important to note that not all methods are
* guaranteed to be called once and only once. If a method is not guaranteed to
* have this property the output committer needs to handle this appropriately.
* Also note it will only be in rare situations where they may be called
* multiple times for the same task.
* *
* @see FileOutputCommitter * @see FileOutputCommitter
* @see JobContext * @see JobContext
@ -62,7 +70,9 @@ import org.apache.hadoop.classification.InterfaceStability;
public abstract class OutputCommitter public abstract class OutputCommitter
extends org.apache.hadoop.mapreduce.OutputCommitter { extends org.apache.hadoop.mapreduce.OutputCommitter {
/** /**
* For the framework to setup the job output during initialization * For the framework to setup the job output during initialization. This is
* called from the application master process for the entire job. This will be
* called multiple times, once per job attempt.
* *
* @param jobContext Context of the job whose output is being written. * @param jobContext Context of the job whose output is being written.
* @throws IOException if temporary output could not be created * @throws IOException if temporary output could not be created
@ -70,7 +80,9 @@ public abstract class OutputCommitter
public abstract void setupJob(JobContext jobContext) throws IOException; public abstract void setupJob(JobContext jobContext) throws IOException;
/** /**
* For cleaning up the job's output after job completion * For cleaning up the job's output after job completion. This is called
* from the application master process for the entire job. This may be called
* multiple times.
* *
* @param jobContext Context of the job whose output is being written. * @param jobContext Context of the job whose output is being written.
* @throws IOException * @throws IOException
@ -82,7 +94,10 @@ public abstract class OutputCommitter
/** /**
* For committing job's output after successful job completion. Note that this * For committing job's output after successful job completion. Note that this
* is invoked for jobs with final runstate as SUCCESSFUL. * is invoked for jobs with final runstate as SUCCESSFUL. This is called
* from the application master process for the entire job. This is guaranteed
* to only be called once. If it throws an exception the entire job will
* fail.
* *
* @param jobContext Context of the job whose output is being written. * @param jobContext Context of the job whose output is being written.
* @throws IOException * @throws IOException
@ -94,7 +109,8 @@ public abstract class OutputCommitter
/** /**
* For aborting an unsuccessful job's output. Note that this is invoked for * For aborting an unsuccessful job's output. Note that this is invoked for
* jobs with final runstate as {@link JobStatus#FAILED} or * jobs with final runstate as {@link JobStatus#FAILED} or
* {@link JobStatus#KILLED} * {@link JobStatus#KILLED}. This is called from the application
* master process for the entire job. This may be called multiple times.
* *
* @param jobContext Context of the job whose output is being written. * @param jobContext Context of the job whose output is being written.
* @param status final runstate of the job * @param status final runstate of the job
@ -106,7 +122,10 @@ public abstract class OutputCommitter
} }
/** /**
* Sets up output for the task. * Sets up output for the task. This is called from each individual task's
* process that will output to HDFS, and it is called just for that task. This
* may be called multiple times for the same task, but for different task
* attempts.
* *
* @param taskContext Context of the task whose output is being written. * @param taskContext Context of the task whose output is being written.
* @throws IOException * @throws IOException
@ -115,7 +134,9 @@ public abstract class OutputCommitter
throws IOException; throws IOException;
/** /**
* Check whether task needs a commit * Check whether task needs a commit. This is called from each individual
* task's process that will output to HDFS, and it is called just for that
* task.
* *
* @param taskContext * @param taskContext
* @return true/false * @return true/false
@ -125,9 +146,16 @@ public abstract class OutputCommitter
throws IOException; throws IOException;
/** /**
* To promote the task's temporary output to final output location * To promote the task's temporary output to final output location.
* * If {@link #needsTaskCommit(TaskAttemptContext)} returns true and this
* The task's output is moved to the job's output directory. * task is the task that the AM determines finished first, this method
* is called to commit an individual task's output. This is to mark
* that tasks output as complete, as {@link #commitJob(JobContext)} will
* also be called later on if the entire job finished successfully. This
* is called from a task's process. This may be called multiple times for the
* same task, but different task attempts. It should be very rare for this to
* be called multiple times and requires odd networking failures to make this
* happen. In the future the Hadoop framework may eliminate this race.
* *
* @param taskContext Context of the task whose output is being written. * @param taskContext Context of the task whose output is being written.
* @throws IOException if commit is not * @throws IOException if commit is not
@ -136,7 +164,9 @@ public abstract class OutputCommitter
throws IOException; throws IOException;
/** /**
* Discard the task output * Discard the task output. This is called from a task's process to clean
* up a single task's output that can not yet been committed. This may be
* called multiple times for the same task, but for different task attempts.
* *
* @param taskContext * @param taskContext
* @throws IOException * @throws IOException
@ -160,7 +190,8 @@ public abstract class OutputCommitter
* The retry-count for the job will be passed via the * The retry-count for the job will be passed via the
* {@link MRConstants#APPLICATION_ATTEMPT_ID} key in * {@link MRConstants#APPLICATION_ATTEMPT_ID} key in
* {@link TaskAttemptContext#getConfiguration()} for the * {@link TaskAttemptContext#getConfiguration()} for the
* <code>OutputCommitter</code>. * <code>OutputCommitter</code>. This is called from the application master
* process, but it is called individually for each task.
* *
* If an exception is thrown the task will be attempted again. * If an exception is thrown the task will be attempted again.
* *

View File

@ -54,7 +54,11 @@ import org.apache.hadoop.classification.InterfaceStability;
* The methods in this class can be called from several different processes and * The methods in this class can be called from several different processes and
* from several different contexts. It is important to know which process and * from several different contexts. It is important to know which process and
* which context each is called from. Each method should be marked accordingly * which context each is called from. Each method should be marked accordingly
* in its documentation. * in its documentation. It is also important to note that not all methods are
* guaranteed to be called once and only once. If a method is not guaranteed to
* have this property the output committer needs to handle this appropriately.
* Also note it will only be in rare situations where they may be called
* multiple times for the same task.
* *
* @see org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter * @see org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
* @see JobContext * @see JobContext
@ -65,7 +69,8 @@ import org.apache.hadoop.classification.InterfaceStability;
public abstract class OutputCommitter { public abstract class OutputCommitter {
/** /**
* For the framework to setup the job output during initialization. This is * For the framework to setup the job output during initialization. This is
* called from the application master process for the entire job. * called from the application master process for the entire job. This will be
* called multiple times, once per job attempt.
* *
* @param jobContext Context of the job whose output is being written. * @param jobContext Context of the job whose output is being written.
* @throws IOException if temporary output could not be created * @throws IOException if temporary output could not be created
@ -74,7 +79,8 @@ public abstract class OutputCommitter {
/** /**
* For cleaning up the job's output after job completion. This is called * For cleaning up the job's output after job completion. This is called
* from the application master process for the entire job. * from the application master process for the entire job. This may be called
* multiple times.
* *
* @param jobContext Context of the job whose output is being written. * @param jobContext Context of the job whose output is being written.
* @throws IOException * @throws IOException
@ -87,7 +93,9 @@ public abstract class OutputCommitter {
/** /**
* For committing job's output after successful job completion. Note that this * For committing job's output after successful job completion. Note that this
* is invoked for jobs with final runstate as SUCCESSFUL. This is called * is invoked for jobs with final runstate as SUCCESSFUL. This is called
* from the application master process for the entire job. * from the application master process for the entire job. This is guaranteed
* to only be called once. If it throws an exception the entire job will
* fail.
* *
* @param jobContext Context of the job whose output is being written. * @param jobContext Context of the job whose output is being written.
* @throws IOException * @throws IOException
@ -101,7 +109,7 @@ public abstract class OutputCommitter {
* For aborting an unsuccessful job's output. Note that this is invoked for * For aborting an unsuccessful job's output. Note that this is invoked for
* jobs with final runstate as {@link JobStatus.State#FAILED} or * jobs with final runstate as {@link JobStatus.State#FAILED} or
* {@link JobStatus.State#KILLED}. This is called from the application * {@link JobStatus.State#KILLED}. This is called from the application
* master process for the entire job. * master process for the entire job. This may be called multiple times.
* *
* @param jobContext Context of the job whose output is being written. * @param jobContext Context of the job whose output is being written.
* @param state final runstate of the job * @param state final runstate of the job
@ -114,7 +122,9 @@ public abstract class OutputCommitter {
/** /**
* Sets up output for the task. This is called from each individual task's * Sets up output for the task. This is called from each individual task's
* process that will output to HDFS, and it is called just for that task. * process that will output to HDFS, and it is called just for that task. This
* may be called multiple times for the same task, but for different task
* attempts.
* *
* @param taskContext Context of the task whose output is being written. * @param taskContext Context of the task whose output is being written.
* @throws IOException * @throws IOException
@ -141,7 +151,10 @@ public abstract class OutputCommitter {
* is called to commit an individual task's output. This is to mark * is called to commit an individual task's output. This is to mark
* that tasks output as complete, as {@link #commitJob(JobContext)} will * that tasks output as complete, as {@link #commitJob(JobContext)} will
* also be called later on if the entire job finished successfully. This * also be called later on if the entire job finished successfully. This
* is called from a task's process. * is called from a task's process. This may be called multiple times for the
* same task, but different task attempts. It should be very rare for this to
* be called multiple times and requires odd networking failures to make this
* happen. In the future the Hadoop framework may eliminate this race.
* *
* @param taskContext Context of the task whose output is being written. * @param taskContext Context of the task whose output is being written.
* @throws IOException if commit is not successful. * @throws IOException if commit is not successful.
@ -151,7 +164,8 @@ public abstract class OutputCommitter {
/** /**
* Discard the task output. This is called from a task's process to clean * Discard the task output. This is called from a task's process to clean
* up a single task's output that can not yet been committed. * up a single task's output that can not yet been committed. This may be
* called multiple times for the same task, but for different task attempts.
* *
* @param taskContext * @param taskContext
* @throws IOException * @throws IOException
@ -184,6 +198,9 @@ public abstract class OutputCommitter {
* *
* If an exception is thrown the task will be attempted again. * If an exception is thrown the task will be attempted again.
* *
* This may be called multiple times for the same task. But from different
* application attempts.
*
* @param taskContext Context of the task whose output is being recovered * @param taskContext Context of the task whose output is being recovered
* @throws IOException * @throws IOException
*/ */

View File

@ -0,0 +1,27 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.jobhistory;
import java.io.IOException;
public interface HistoryEventHandler {
void handleEvent(HistoryEvent event) throws IOException;
}

View File

@ -54,7 +54,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
@InterfaceStability.Unstable @InterfaceStability.Unstable
public class JobHistoryParser { public class JobHistoryParser implements HistoryEventHandler {
private static final Log LOG = LogFactory.getLog(JobHistoryParser.class); private static final Log LOG = LogFactory.getLog(JobHistoryParser.class);
@ -94,6 +94,34 @@ public class JobHistoryParser {
this.in = in; this.in = in;
} }
public synchronized void parse(HistoryEventHandler handler)
throws IOException {
parse(new EventReader(in), handler);
}
/**
* Only used for unit tests.
*/
@Private
public synchronized void parse(EventReader reader, HistoryEventHandler handler)
throws IOException {
int eventCtr = 0;
HistoryEvent event;
try {
while ((event = reader.getNextEvent()) != null) {
handler.handleEvent(event);
++eventCtr;
}
} catch (IOException ioe) {
LOG.info("Caught exception parsing history file after " + eventCtr +
" events", ioe);
parseException = ioe;
} finally {
in.close();
}
}
/** /**
* Parse the entire history file and populate the JobInfo object * Parse the entire history file and populate the JobInfo object
* The first invocation will populate the object, subsequent calls * The first invocation will populate the object, subsequent calls
@ -122,21 +150,7 @@ public class JobHistoryParser {
} }
info = new JobInfo(); info = new JobInfo();
parse(reader, this);
int eventCtr = 0;
HistoryEvent event;
try {
while ((event = reader.getNextEvent()) != null) {
handleEvent(event);
++eventCtr;
}
} catch (IOException ioe) {
LOG.info("Caught exception parsing history file after " + eventCtr +
" events", ioe);
parseException = ioe;
} finally {
in.close();
}
return info; return info;
} }
@ -150,7 +164,8 @@ public class JobHistoryParser {
return parseException; return parseException;
} }
private void handleEvent(HistoryEvent event) { @Override
public void handleEvent(HistoryEvent event) {
EventType type = event.getEventType(); EventType type = event.getEventType();
switch (type) { switch (type) {

View File

@ -667,6 +667,9 @@ public class HistoryFileManager extends AbstractService {
} }
}); });
} }
} else if (old != null && !old.isMovePending()) {
//This is a duplicate so just delete it
fileInfo.delete();
} }
} }
} }

View File

@ -125,7 +125,12 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher {
Class<? extends Enum> type = event.getType().getDeclaringClass(); Class<? extends Enum> type = event.getType().getDeclaringClass();
try{ try{
eventDispatchers.get(type).handle(event); EventHandler handler = eventDispatchers.get(type);
if(handler != null) {
handler.handle(event);
} else {
throw new Exception("No handler for registered for " + type);
}
} }
catch (Throwable t) { catch (Throwable t) {
//TODO Maybe log the state of the queue //TODO Maybe log the state of the queue

View File

@ -286,9 +286,12 @@ public class WebAppProxyServlet extends HttpServlet {
"please try the history server"); "please try the history server");
return; return;
} }
URI trackingUri = ProxyUriUtils.getUriFromAMUrl( String original = applicationReport.getOriginalTrackingUrl();
applicationReport.getOriginalTrackingUrl()); URI trackingUri = null;
if(applicationReport.getOriginalTrackingUrl().equals("N/A")) { if (original != null) {
trackingUri = ProxyUriUtils.getUriFromAMUrl(original);
}
if(original == null || original.equals("N/A")) {
String message; String message;
switch(applicationReport.getFinalApplicationStatus()) { switch(applicationReport.getFinalApplicationStatus()) {
case FAILED: case FAILED: