merge MAPREDUCE-5079 from trunk. Changes job recovery to restore state directly from job history, instaed of simulating state machine events. Contributed by Jason Lowe and Robert Parker.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1466770 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Siddharth Seth 2013-04-11 05:02:45 +00:00
parent fc570c127a
commit 422133edf7
21 changed files with 1146 additions and 770 deletions

View File

@ -23,6 +23,10 @@ Release 2.0.5-beta - UNRELEASED
MAPREDUCE-3787. [Gridmix] Optimize job monitoring and STRESS mode for
faster job submission. (amarrk via tgraves)
MAPREDUCE-5079. Changes job recovery to restore state directly from job
history, instaed of simulating state machine events.
(Jason Lowe and Robert Parker via sseth)
OPTIMIZATIONS
BUG FIXES

View File

@ -24,9 +24,12 @@ import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.io.IOUtils;
@ -46,6 +49,7 @@ import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.jobhistory.AMStartedEvent;
import org.apache.hadoop.mapreduce.jobhistory.EventReader;
@ -54,6 +58,9 @@ import org.apache.hadoop.mapreduce.jobhistory.HistoryEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryCopyService;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
import org.apache.hadoop.mapreduce.security.TokenCache;
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
@ -61,6 +68,7 @@ import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
import org.apache.hadoop.mapreduce.v2.app.client.MRClientService;
@ -74,6 +82,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobFinishEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobStartEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
@ -84,8 +93,6 @@ import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherImpl;
import org.apache.hadoop.mapreduce.v2.app.local.LocalContainerAllocator;
import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
import org.apache.hadoop.mapreduce.v2.app.recover.Recovery;
import org.apache.hadoop.mapreduce.v2.app.recover.RecoveryService;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
import org.apache.hadoop.mapreduce.v2.app.rm.RMCommunicator;
@ -94,6 +101,7 @@ import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
import org.apache.hadoop.mapreduce.v2.app.speculate.DefaultSpeculator;
import org.apache.hadoop.mapreduce.v2.app.speculate.Speculator;
import org.apache.hadoop.mapreduce.v2.app.speculate.SpeculatorEvent;
import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
@ -167,7 +175,6 @@ public class MRAppMaster extends CompositeService {
private AppContext context;
private Dispatcher dispatcher;
private ClientService clientService;
private Recovery recoveryServ;
private ContainerAllocator containerAllocator;
private ContainerLauncher containerLauncher;
private EventHandler<CommitterEvent> committerEventHandler;
@ -180,7 +187,6 @@ public class MRAppMaster extends CompositeService {
private OutputCommitter committer;
private JobEventDispatcher jobEventDispatcher;
private JobHistoryEventHandler jobHistoryEventHandler;
private boolean inRecovery = false;
private SpeculatorEventDispatcher speculatorEventDispatcher;
private Job job;
@ -193,6 +199,8 @@ public class MRAppMaster extends CompositeService {
private String shutDownMessage = null;
JobStateInternal forcedState = null;
private long recoveredJobStartTime = 0;
public MRAppMaster(ApplicationAttemptId applicationAttemptId,
ContainerId containerId, String nmHost, int nmPort, int nmHttpPort,
long appSubmitTime, int maxAppAttempts) {
@ -340,34 +348,9 @@ public class MRAppMaster extends CompositeService {
}
} else {
committer = createOutputCommitter(conf);
boolean recoveryEnabled = conf.getBoolean(
MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
boolean recoverySupportedByCommitter = committer.isRecoverySupported();
// If a shuffle secret was not provided by the job client then this app
// attempt will generate one. However that disables recovery if there
// are reducers as the shuffle secret would be app attempt specific.
boolean shuffleKeyValidForRecovery = (numReduceTasks > 0 &&
TokenCache.getShuffleSecretKey(fsTokens) != null);
if (recoveryEnabled && recoverySupportedByCommitter
&& shuffleKeyValidForRecovery && appAttemptID.getAttemptId() > 1) {
LOG.info("Recovery is enabled. "
+ "Will try to recover from previous life on best effort basis.");
recoveryServ = createRecoveryService(context);
addIfService(recoveryServ);
dispatcher = recoveryServ.getDispatcher();
clock = recoveryServ.getClock();
inRecovery = true;
} else {
LOG.info("Not starting RecoveryService: recoveryEnabled: "
+ recoveryEnabled + " recoverySupportedByCommitter: "
+ recoverySupportedByCommitter + " shuffleKeyValidForRecovery: "
+ shuffleKeyValidForRecovery + " ApplicationAttemptID: "
+ appAttemptID.getAttemptId());
dispatcher = createDispatcher();
addIfService(dispatcher);
}
dispatcher = createDispatcher();
addIfService(dispatcher);
//service to handle requests from JobClient
clientService = createClientService(context);
@ -595,15 +578,6 @@ public class MRAppMaster extends CompositeService {
return new JobFinishEventHandler();
}
/**
* Create the recovery service.
* @return an instance of the recovery service.
*/
protected Recovery createRecoveryService(AppContext appContext) {
return new RecoveryService(appContext.getApplicationAttemptId(),
appContext.getClock(), getCommitter(), isNewApiCommitter());
}
/** Create and initialize (but don't start) a single job.
* @param forcedState a state to force the job into or null for normal operation.
* @param diagnostic a diagnostic message to include with the job.
@ -615,7 +589,8 @@ public class MRAppMaster extends CompositeService {
Job newJob =
new JobImpl(jobId, appAttemptID, conf, dispatcher.getEventHandler(),
taskAttemptListener, jobTokenSecretManager, fsTokens, clock,
completedTasksFromPreviousRun, metrics, newApiCommitter,
completedTasksFromPreviousRun, metrics,
committer, newApiCommitter,
currentUser.getUserName(), appSubmitTime, amInfos, context,
forcedState, diagnostic);
((RunningAppContext) context).jobs.put(newJob.getID(), newJob);
@ -978,18 +953,8 @@ public class MRAppMaster extends CompositeService {
public void start() {
amInfos = new LinkedList<AMInfo>();
// Pull completedTasks etc from recovery
if (inRecovery) {
completedTasksFromPreviousRun = recoveryServ.getCompletedTasks();
amInfos = recoveryServ.getAMInfos();
} else {
// Get the amInfos anyways irrespective of whether recovery is enabled or
// not IF this is not the first AM generation
if (appAttemptID.getAttemptId() != 1) {
amInfos.addAll(readJustAMInfos());
}
}
completedTasksFromPreviousRun = new HashMap<TaskId, TaskInfo>();
processRecovery();
// Current an AMInfo for the current AM generation.
AMInfo amInfo =
@ -1051,13 +1016,105 @@ public class MRAppMaster extends CompositeService {
startJobs();
}
private void processRecovery() {
if (appAttemptID.getAttemptId() == 1) {
return; // no need to recover on the first attempt
}
boolean recoveryEnabled = getConfig().getBoolean(
MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE,
MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE_DEFAULT);
boolean recoverySupportedByCommitter =
committer != null && committer.isRecoverySupported();
// If a shuffle secret was not provided by the job client then this app
// attempt will generate one. However that disables recovery if there
// are reducers as the shuffle secret would be app attempt specific.
int numReduceTasks = getConfig().getInt(MRJobConfig.NUM_REDUCES, 0);
boolean shuffleKeyValidForRecovery = (numReduceTasks > 0 &&
TokenCache.getShuffleSecretKey(fsTokens) != null);
if (recoveryEnabled && recoverySupportedByCommitter
&& shuffleKeyValidForRecovery) {
LOG.info("Recovery is enabled. "
+ "Will try to recover from previous life on best effort basis.");
try {
parsePreviousJobHistory();
} catch (IOException e) {
LOG.warn("Unable to parse prior job history, aborting recovery", e);
// try to get just the AMInfos
amInfos.addAll(readJustAMInfos());
}
} else {
LOG.info("Will not try to recover. recoveryEnabled: "
+ recoveryEnabled + " recoverySupportedByCommitter: "
+ recoverySupportedByCommitter + " shuffleKeyValidForRecovery: "
+ shuffleKeyValidForRecovery + " ApplicationAttemptID: "
+ appAttemptID.getAttemptId());
// Get the amInfos anyways whether recovery is enabled or not
amInfos.addAll(readJustAMInfos());
}
}
private static FSDataInputStream getPreviousJobHistoryStream(
Configuration conf, ApplicationAttemptId appAttemptId)
throws IOException {
Path historyFile = JobHistoryUtils.getPreviousJobHistoryPath(conf,
appAttemptId);
LOG.info("Previous history file is at " + historyFile);
return historyFile.getFileSystem(conf).open(historyFile);
}
private void parsePreviousJobHistory() throws IOException {
FSDataInputStream in = getPreviousJobHistoryStream(getConfig(),
appAttemptID);
JobHistoryParser parser = new JobHistoryParser(in);
JobInfo jobInfo = parser.parse();
Exception parseException = parser.getParseException();
if (parseException != null) {
LOG.info("Got an error parsing job-history file" +
", ignoring incomplete events.", parseException);
}
Map<org.apache.hadoop.mapreduce.TaskID, TaskInfo> taskInfos = jobInfo
.getAllTasks();
for (TaskInfo taskInfo : taskInfos.values()) {
if (TaskState.SUCCEEDED.toString().equals(taskInfo.getTaskStatus())) {
Iterator<Entry<TaskAttemptID, TaskAttemptInfo>> taskAttemptIterator =
taskInfo.getAllTaskAttempts().entrySet().iterator();
while (taskAttemptIterator.hasNext()) {
Map.Entry<TaskAttemptID, TaskAttemptInfo> currentEntry = taskAttemptIterator.next();
if (!jobInfo.getAllCompletedTaskAttempts().containsKey(currentEntry.getKey())) {
taskAttemptIterator.remove();
}
}
completedTasksFromPreviousRun
.put(TypeConverter.toYarn(taskInfo.getTaskId()), taskInfo);
LOG.info("Read from history task "
+ TypeConverter.toYarn(taskInfo.getTaskId()));
}
}
LOG.info("Read completed tasks from history "
+ completedTasksFromPreviousRun.size());
recoveredJobStartTime = jobInfo.getLaunchTime();
// recover AMInfos
List<JobHistoryParser.AMInfo> jhAmInfoList = jobInfo.getAMInfos();
if (jhAmInfoList != null) {
for (JobHistoryParser.AMInfo jhAmInfo : jhAmInfoList) {
AMInfo amInfo = MRBuilderUtils.newAMInfo(jhAmInfo.getAppAttemptId(),
jhAmInfo.getStartTime(), jhAmInfo.getContainerId(),
jhAmInfo.getNodeManagerHost(), jhAmInfo.getNodeManagerPort(),
jhAmInfo.getNodeManagerHttpPort());
amInfos.add(amInfo);
}
}
}
private List<AMInfo> readJustAMInfos() {
List<AMInfo> amInfos = new ArrayList<AMInfo>();
FSDataInputStream inputStream = null;
try {
inputStream =
RecoveryService.getPreviousJobHistoryFileStream(getConfig(),
appAttemptID);
inputStream = getPreviousJobHistoryStream(getConfig(), appAttemptID);
EventReader jobHistoryEventReader = new EventReader(inputStream);
// All AMInfos are contiguous. Track when the first AMStartedEvent
@ -1108,7 +1165,8 @@ public class MRAppMaster extends CompositeService {
@SuppressWarnings("unchecked")
protected void startJobs() {
/** create a job-start event to get this ball rolling */
JobEvent startJobEvent = new JobEvent(job.getID(), JobEventType.JOB_START);
JobEvent startJobEvent = new JobStartEvent(job.getID(),
recoveredJobStartTime);
/** send the job-start event. this triggers the job execution. */
dispatcher.getEventHandler().handle(startJobEvent);
}

View File

@ -1,4 +1,4 @@
/*
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -15,6 +15,25 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
@InterfaceAudience.Private
package org.apache.hadoop.mapreduce.v2.app.recover;
import org.apache.hadoop.classification.InterfaceAudience;
package org.apache.hadoop.mapreduce.v2.app.job.event;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
public class JobStartEvent extends JobEvent {
long recoveredJobStartTime;
public JobStartEvent(JobId jobID) {
this(jobID, 0);
}
public JobStartEvent(JobId jobID, long recoveredJobStartTime) {
super(jobID, JobEventType.JOB_START);
this.recoveredJobStartTime = recoveredJobStartTime;
}
public long getRecoveredJobStartTime() {
return recoveredJobStartTime;
}
}

View File

@ -26,6 +26,7 @@ public enum TaskAttemptEventType {
//Producer:Task
TA_SCHEDULE,
TA_RESCHEDULE,
TA_RECOVER,
//Producer:Client, Task
TA_KILL,

View File

@ -0,0 +1,50 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.v2.app.job.event;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
public class TaskAttemptRecoverEvent extends TaskAttemptEvent {
private TaskAttemptInfo taInfo;
private OutputCommitter committer;
private boolean recoverAttemptOutput;
public TaskAttemptRecoverEvent(TaskAttemptId id, TaskAttemptInfo taInfo,
OutputCommitter committer, boolean recoverOutput) {
super(id, TaskAttemptEventType.TA_RECOVER);
this.taInfo = taInfo;
this.committer = committer;
this.recoverAttemptOutput = recoverOutput;
}
public TaskAttemptInfo getTaskAttemptInfo() {
return taInfo;
}
public OutputCommitter getCommitter() {
return committer;
}
public boolean getRecoverOutput() {
return recoverAttemptOutput;
}
}

View File

@ -28,6 +28,7 @@ public enum TaskEventType {
//Producer:Job
T_SCHEDULE,
T_RECOVER,
//Producer:Speculator
T_ADD_SPEC_ATTEMPT,

View File

@ -0,0 +1,50 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.v2.app.job.event;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
public class TaskRecoverEvent extends TaskEvent {
private TaskInfo taskInfo;
private OutputCommitter committer;
private boolean recoverTaskOutput;
public TaskRecoverEvent(TaskId taskID, TaskInfo taskInfo,
OutputCommitter committer, boolean recoverTaskOutput) {
super(taskID, TaskEventType.T_RECOVER);
this.taskInfo = taskInfo;
this.committer = committer;
this.recoverTaskOutput = recoverTaskOutput;
}
public TaskInfo getTaskInfo() {
return taskInfo;
}
public OutputCommitter getOutputCommitter() {
return committer;
}
public boolean getRecoverTaskOutput() {
return recoverTaskOutput;
}
}

View File

@ -49,6 +49,7 @@ import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.JobACL;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.jobhistory.JobFinishedEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
@ -92,6 +93,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobFinishEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobSetupFailedEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobStartEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskAttemptCompletedEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskAttemptFetchFailureEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskEvent;
@ -101,6 +103,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptKillEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskRecoverEvent;
import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
@ -159,6 +162,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
private final Lock writeLock;
private final JobId jobId;
private final String jobName;
private final OutputCommitter committer;
private final boolean newApiCommitter;
private final org.apache.hadoop.mapreduce.JobID oldJobId;
private final TaskAttemptListener taskAttemptListener;
@ -602,7 +606,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
JobTokenSecretManager jobTokenSecretManager,
Credentials fsTokenCredentials, Clock clock,
Map<TaskId, TaskInfo> completedTasksFromPreviousRun, MRAppMetrics metrics,
boolean newApiCommitter, String userName,
OutputCommitter committer, boolean newApiCommitter, String userName,
long appSubmitTime, List<AMInfo> amInfos, AppContext appContext,
JobStateInternal forcedState, String forcedDiagnostic) {
this.applicationAttemptId = applicationAttemptId;
@ -618,6 +622,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
this.queueName = conf.get(MRJobConfig.QUEUE_NAME, "default");
this.appSubmitTime = appSubmitTime;
this.oldJobId = TypeConverter.fromYarn(jobId);
this.committer = committer;
this.newApiCommitter = newApiCommitter;
this.taskAttemptListener = taskAttemptListener;
@ -888,10 +893,16 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
}
}
protected void scheduleTasks(Set<TaskId> taskIDs) {
protected void scheduleTasks(Set<TaskId> taskIDs,
boolean recoverTaskOutput) {
for (TaskId taskID : taskIDs) {
eventHandler.handle(new TaskEvent(taskID,
TaskEventType.T_SCHEDULE));
TaskInfo taskInfo = completedTasksFromPreviousRun.remove(taskID);
if (taskInfo != null) {
eventHandler.handle(new TaskRecoverEvent(taskID, taskInfo,
committer, recoverTaskOutput));
} else {
eventHandler.handle(new TaskEvent(taskID, TaskEventType.T_SCHEDULE));
}
}
}
@ -1421,7 +1432,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
job.conf, splits[i],
job.taskAttemptListener,
job.jobToken, job.fsTokens,
job.clock, job.completedTasksFromPreviousRun,
job.clock,
job.applicationAttemptId.getAttemptId(),
job.metrics, job.appContext);
job.addTask(task);
@ -1439,7 +1450,6 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
job.conf, job.numMapTasks,
job.taskAttemptListener, job.jobToken,
job.fsTokens, job.clock,
job.completedTasksFromPreviousRun,
job.applicationAttemptId.getAttemptId(),
job.metrics, job.appContext);
job.addTask(task);
@ -1475,8 +1485,8 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
@Override
public void transition(JobImpl job, JobEvent event) {
job.setupProgress = 1.0f;
job.scheduleTasks(job.mapTasks); // schedule (i.e., start) the maps
job.scheduleTasks(job.reduceTasks);
job.scheduleTasks(job.mapTasks, job.numReduceTasks == 0);
job.scheduleTasks(job.reduceTasks, true);
// If we have no tasks, just transition to job completed
if (job.numReduceTasks == 0 && job.numMapTasks == 0) {
@ -1507,7 +1517,12 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
*/
@Override
public void transition(JobImpl job, JobEvent event) {
job.startTime = job.clock.getTime();
JobStartEvent jse = (JobStartEvent) event;
if (jse.getRecoveredJobStartTime() != 0) {
job.startTime = jse.getRecoveredJobStartTime();
} else {
job.startTime = job.clock.getTime();
}
JobInitedEvent jie =
new JobInitedEvent(job.oldJobId,
job.startTime,

View File

@ -18,17 +18,13 @@
package org.apache.hadoop.mapreduce.v2.app.job.impl;
import java.util.Map;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapTaskAttemptImpl;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
@ -49,11 +45,10 @@ public class MapTaskImpl extends TaskImpl {
TaskAttemptListener taskAttemptListener,
Token<JobTokenIdentifier> jobToken,
Credentials credentials, Clock clock,
Map<TaskId, TaskInfo> completedTasksFromPreviousRun, int startCount,
MRAppMetrics metrics, AppContext appContext) {
int appAttemptId, MRAppMetrics metrics, AppContext appContext) {
super(jobId, TaskType.MAP, partition, eventHandler, remoteJobConfFile,
conf, taskAttemptListener, jobToken, credentials, clock,
completedTasksFromPreviousRun, startCount, metrics, appContext);
appAttemptId, metrics, appContext);
this.taskSplitMetaInfo = taskSplitMetaInfo;
}

View File

@ -18,16 +18,12 @@
package org.apache.hadoop.mapreduce.v2.app.job.impl;
import java.util.Map;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.ReduceTaskAttemptImpl;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
@ -47,11 +43,10 @@ public class ReduceTaskImpl extends TaskImpl {
int numMapTasks, TaskAttemptListener taskAttemptListener,
Token<JobTokenIdentifier> jobToken,
Credentials credentials, Clock clock,
Map<TaskId, TaskInfo> completedTasksFromPreviousRun, int startCount,
MRAppMetrics metrics, AppContext appContext) {
int appAttemptId, MRAppMetrics metrics, AppContext appContext) {
super(jobId, TaskType.REDUCE, partition, eventHandler, jobFile, conf,
taskAttemptListener, jobToken, credentials, clock,
completedTasksFromPreviousRun, startCount, metrics, appContext);
appAttemptId, metrics, appContext);
this.numMapTasks = numMapTasks;
}

View File

@ -56,10 +56,12 @@ import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.JobCounter;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskCounter;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo;
import org.apache.hadoop.mapreduce.jobhistory.MapAttemptFinishedEvent;
import org.apache.hadoop.mapreduce.jobhistory.ReduceAttemptFinishedEvent;
import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptStartedEvent;
@ -89,6 +91,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptDiagnosticsUpdate
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptKillEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptRecoverEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
@ -204,6 +207,11 @@ public abstract class TaskAttemptImpl implements
TaskAttemptEventType.TA_KILL, new KilledTransition())
.addTransition(TaskAttemptStateInternal.NEW, TaskAttemptStateInternal.FAILED,
TaskAttemptEventType.TA_FAILMSG, new FailedTransition())
.addTransition(TaskAttemptStateInternal.NEW,
EnumSet.of(TaskAttemptStateInternal.FAILED,
TaskAttemptStateInternal.KILLED,
TaskAttemptStateInternal.SUCCEEDED),
TaskAttemptEventType.TA_RECOVER, new RecoverTransition())
.addTransition(TaskAttemptStateInternal.NEW,
TaskAttemptStateInternal.NEW,
TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
@ -1082,6 +1090,102 @@ public abstract class TaskAttemptImpl implements
this.avataar = avataar;
}
@SuppressWarnings("unchecked")
public TaskAttemptStateInternal recover(TaskAttemptInfo taInfo,
OutputCommitter committer, boolean recoverOutput) {
containerID = taInfo.getContainerId();
containerNodeId = ConverterUtils.toNodeId(taInfo.getHostname() + ":"
+ taInfo.getPort());
containerMgrAddress = StringInterner.weakIntern(
containerNodeId.toString());
nodeHttpAddress = StringInterner.weakIntern(taInfo.getHostname() + ":"
+ taInfo.getHttpPort());
computeRackAndLocality();
launchTime = taInfo.getStartTime();
finishTime = (taInfo.getFinishTime() != -1) ?
taInfo.getFinishTime() : clock.getTime();
shufflePort = taInfo.getShufflePort();
trackerName = taInfo.getHostname();
httpPort = taInfo.getHttpPort();
sendLaunchedEvents();
reportedStatus.id = attemptId;
reportedStatus.progress = 1.0f;
reportedStatus.counters = taInfo.getCounters();
reportedStatus.stateString = taInfo.getState();
reportedStatus.phase = Phase.CLEANUP;
reportedStatus.mapFinishTime = taInfo.getMapFinishTime();
reportedStatus.shuffleFinishTime = taInfo.getShuffleFinishTime();
reportedStatus.sortFinishTime = taInfo.getSortFinishTime();
addDiagnosticInfo(taInfo.getError());
boolean needToClean = false;
String recoveredState = taInfo.getTaskStatus();
if (recoverOutput
&& TaskAttemptState.SUCCEEDED.toString().equals(recoveredState)) {
TaskAttemptContext tac = new TaskAttemptContextImpl(conf,
TypeConverter.fromYarn(attemptId));
try {
committer.recoverTask(tac);
LOG.info("Recovered output from task attempt " + attemptId);
} catch (Exception e) {
LOG.error("Unable to recover task attempt " + attemptId, e);
LOG.info("Task attempt " + attemptId + " will be recovered as KILLED");
recoveredState = TaskAttemptState.KILLED.toString();
needToClean = true;
}
}
TaskAttemptStateInternal attemptState;
if (TaskAttemptState.SUCCEEDED.toString().equals(recoveredState)) {
attemptState = TaskAttemptStateInternal.SUCCEEDED;
reportedStatus.taskState = TaskAttemptState.SUCCEEDED;
eventHandler.handle(createJobCounterUpdateEventTASucceeded(this));
logAttemptFinishedEvent(attemptState);
} else if (TaskAttemptState.FAILED.toString().equals(recoveredState)) {
attemptState = TaskAttemptStateInternal.FAILED;
reportedStatus.taskState = TaskAttemptState.FAILED;
eventHandler.handle(createJobCounterUpdateEventTAFailed(this, false));
TaskAttemptUnsuccessfulCompletionEvent tauce =
createTaskAttemptUnsuccessfulCompletionEvent(this,
TaskAttemptStateInternal.FAILED);
eventHandler.handle(
new JobHistoryEvent(attemptId.getTaskId().getJobId(), tauce));
} else {
if (!TaskAttemptState.KILLED.toString().equals(recoveredState)) {
if (String.valueOf(recoveredState).isEmpty()) {
LOG.info("TaskAttempt" + attemptId
+ " had not completed, recovering as KILLED");
} else {
LOG.warn("TaskAttempt " + attemptId + " found in unexpected state "
+ recoveredState + ", recovering as KILLED");
}
addDiagnosticInfo("Killed during application recovery");
needToClean = true;
}
attemptState = TaskAttemptStateInternal.KILLED;
reportedStatus.taskState = TaskAttemptState.KILLED;
eventHandler.handle(createJobCounterUpdateEventTAKilled(this, false));
TaskAttemptUnsuccessfulCompletionEvent tauce =
createTaskAttemptUnsuccessfulCompletionEvent(this,
TaskAttemptStateInternal.KILLED);
eventHandler.handle(
new JobHistoryEvent(attemptId.getTaskId().getJobId(), tauce));
}
if (needToClean) {
TaskAttemptContext tac = new TaskAttemptContextImpl(conf,
TypeConverter.fromYarn(attemptId));
try {
committer.abortTask(tac);
} catch (Exception e) {
LOG.warn("Task cleanup failed for attempt " + attemptId, e);
}
}
return attemptState;
}
private static TaskAttemptState getExternalState(
TaskAttemptStateInternal smState) {
switch (smState) {
@ -1122,6 +1226,24 @@ public abstract class TaskAttemptImpl implements
}
}
private void computeRackAndLocality() {
nodeRackName = RackResolver.resolve(
containerNodeId.getHost()).getNetworkLocation();
locality = Locality.OFF_SWITCH;
if (dataLocalHosts.size() > 0) {
String cHost = resolveHost(containerNodeId.getHost());
if (dataLocalHosts.contains(cHost)) {
locality = Locality.NODE_LOCAL;
}
}
if (locality == Locality.OFF_SWITCH) {
if (dataLocalRacks.contains(nodeRackName)) {
locality = Locality.RACK_LOCAL;
}
}
}
private static long computeSlotMillis(TaskAttemptImpl taskAttempt) {
TaskType taskType = taskAttempt.getID().getTaskId().getTaskType();
int slotMemoryReq =
@ -1141,6 +1263,18 @@ public abstract class TaskAttemptImpl implements
return slotMillisIncrement;
}
private static JobCounterUpdateEvent createJobCounterUpdateEventTASucceeded(
TaskAttemptImpl taskAttempt) {
long slotMillis = computeSlotMillis(taskAttempt);
TaskId taskId = taskAttempt.attemptId.getTaskId();
JobCounterUpdateEvent jce = new JobCounterUpdateEvent(taskId.getJobId());
jce.addCounterUpdate(
taskId.getTaskType() == TaskType.MAP ?
JobCounter.SLOTS_MILLIS_MAPS : JobCounter.SLOTS_MILLIS_REDUCES,
slotMillis);
return jce;
}
private static JobCounterUpdateEvent createJobCounterUpdateEventTAFailed(
TaskAttemptImpl taskAttempt, boolean taskAlreadyCompleted) {
TaskType taskType = taskAttempt.getID().getTaskId().getTaskType();
@ -1210,6 +1344,26 @@ public abstract class TaskAttemptImpl implements
return tauce;
}
@SuppressWarnings("unchecked")
private void sendLaunchedEvents() {
JobCounterUpdateEvent jce = new JobCounterUpdateEvent(attemptId.getTaskId()
.getJobId());
jce.addCounterUpdate(attemptId.getTaskId().getTaskType() == TaskType.MAP ?
JobCounter.TOTAL_LAUNCHED_MAPS : JobCounter.TOTAL_LAUNCHED_REDUCES, 1);
eventHandler.handle(jce);
LOG.info("TaskAttempt: [" + attemptId
+ "] using containerId: [" + containerID + " on NM: ["
+ containerMgrAddress + "]");
TaskAttemptStartedEvent tase =
new TaskAttemptStartedEvent(TypeConverter.fromYarn(attemptId),
TypeConverter.fromYarn(attemptId.getTaskId().getTaskType()),
launchTime, trackerName, httpPort, shufflePort, containerID,
locality.toString(), avataar.toString());
eventHandler.handle(
new JobHistoryEvent(attemptId.getTaskId().getJobId(), tase));
}
private WrappedProgressSplitsBlock getProgressSplitBlock() {
readLock.lock();
try {
@ -1342,8 +1496,6 @@ public abstract class TaskAttemptImpl implements
taskAttempt.containerNodeId.toString());
taskAttempt.nodeHttpAddress = StringInterner.weakIntern(
cEvent.getContainer().getNodeHttpAddress());
taskAttempt.nodeRackName = RackResolver.resolve(
taskAttempt.containerNodeId.getHost()).getNetworkLocation();
taskAttempt.containerToken = cEvent.getContainer().getContainerToken();
taskAttempt.assignedCapability = cEvent.getContainer().getResource();
// this is a _real_ Task (classic Hadoop mapred flavor):
@ -1354,19 +1506,7 @@ public abstract class TaskAttemptImpl implements
taskAttempt.taskAttemptListener.registerPendingTask(
taskAttempt.remoteTask, taskAttempt.jvmID);
taskAttempt.locality = Locality.OFF_SWITCH;
if (taskAttempt.dataLocalHosts.size() > 0) {
String cHost = taskAttempt.resolveHost(
taskAttempt.containerNodeId.getHost());
if (taskAttempt.dataLocalHosts.contains(cHost)) {
taskAttempt.locality = Locality.NODE_LOCAL;
}
}
if (taskAttempt.locality == Locality.OFF_SWITCH) {
if (taskAttempt.dataLocalRacks.contains(taskAttempt.nodeRackName)) {
taskAttempt.locality = Locality.RACK_LOCAL;
}
}
taskAttempt.computeRackAndLocality();
//launch the container
//create the container object to be launched for a given Task attempt
@ -1471,27 +1611,7 @@ public abstract class TaskAttemptImpl implements
// Costly?
taskAttempt.trackerName = nodeHttpInetAddr.getHostName();
taskAttempt.httpPort = nodeHttpInetAddr.getPort();
JobCounterUpdateEvent jce =
new JobCounterUpdateEvent(taskAttempt.attemptId.getTaskId()
.getJobId());
jce.addCounterUpdate(
taskAttempt.attemptId.getTaskId().getTaskType() == TaskType.MAP ?
JobCounter.TOTAL_LAUNCHED_MAPS: JobCounter.TOTAL_LAUNCHED_REDUCES
, 1);
taskAttempt.eventHandler.handle(jce);
LOG.info("TaskAttempt: [" + taskAttempt.attemptId
+ "] using containerId: [" + taskAttempt.containerID + " on NM: ["
+ taskAttempt.containerMgrAddress + "]");
TaskAttemptStartedEvent tase =
new TaskAttemptStartedEvent(TypeConverter.fromYarn(taskAttempt.attemptId),
TypeConverter.fromYarn(taskAttempt.attemptId.getTaskId().getTaskType()),
taskAttempt.launchTime,
nodeHttpInetAddr.getHostName(), nodeHttpInetAddr.getPort(),
taskAttempt.shufflePort, taskAttempt.containerID,
taskAttempt.locality.toString(), taskAttempt.avataar.toString());
taskAttempt.eventHandler.handle
(new JobHistoryEvent(taskAttempt.attemptId.getTaskId().getJobId(), tase));
taskAttempt.sendLaunchedEvents();
taskAttempt.eventHandler.handle
(new SpeculatorEvent
(taskAttempt.attemptId, true, taskAttempt.clock.getTime()));
@ -1540,14 +1660,8 @@ public abstract class TaskAttemptImpl implements
TaskAttemptEvent event) {
//set the finish time
taskAttempt.setFinishTime();
long slotMillis = computeSlotMillis(taskAttempt);
TaskId taskId = taskAttempt.attemptId.getTaskId();
JobCounterUpdateEvent jce = new JobCounterUpdateEvent(taskId.getJobId());
jce.addCounterUpdate(
taskId.getTaskType() == TaskType.MAP ?
JobCounter.SLOTS_MILLIS_MAPS : JobCounter.SLOTS_MILLIS_REDUCES,
slotMillis);
taskAttempt.eventHandler.handle(jce);
taskAttempt.eventHandler.handle(
createJobCounterUpdateEventTASucceeded(taskAttempt));
taskAttempt.logAttemptFinishedEvent(TaskAttemptStateInternal.SUCCEEDED);
taskAttempt.eventHandler.handle(new TaskTAttemptEvent(
taskAttempt.attemptId,
@ -1585,6 +1699,18 @@ public abstract class TaskAttemptImpl implements
}
}
private static class RecoverTransition implements
MultipleArcTransition<TaskAttemptImpl, TaskAttemptEvent, TaskAttemptStateInternal> {
@Override
public TaskAttemptStateInternal transition(TaskAttemptImpl taskAttempt,
TaskAttemptEvent event) {
TaskAttemptRecoverEvent tare = (TaskAttemptRecoverEvent) event;
return taskAttempt.recover(tare.getTaskAttemptInfo(),
tare.getCommitter(), tare.getRecoverOutput());
}
}
@SuppressWarnings({ "unchecked" })
private void logAttemptFinishedEvent(TaskAttemptStateInternal state) {
//Log finished events only if an attempt started.

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.mapreduce.v2.app.job.impl;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.EnumSet;
@ -37,7 +38,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo;
@ -69,8 +70,10 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskAttemptCompletedEvent
import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptRecoverEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskRecoverEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerFailedEvent;
@ -152,6 +155,12 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
TaskEventType.T_SCHEDULE, new InitialScheduleTransition())
.addTransition(TaskStateInternal.NEW, TaskStateInternal.KILLED,
TaskEventType.T_KILL, new KillNewTransition())
.addTransition(TaskStateInternal.NEW,
EnumSet.of(TaskStateInternal.FAILED,
TaskStateInternal.KILLED,
TaskStateInternal.RUNNING,
TaskStateInternal.SUCCEEDED),
TaskEventType.T_RECOVER, new RecoverTransition())
// Transitions from SCHEDULED state
//when the first attempt is launched, the task state is set to RUNNING
@ -250,20 +259,16 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
// By default, the next TaskAttempt number is zero. Changes during recovery
protected int nextAttemptNumber = 0;
private List<TaskAttemptInfo> taskAttemptsFromPreviousGeneration =
new ArrayList<TaskAttemptInfo>();
private static final class RecoverdAttemptsComparator implements
Comparator<TaskAttemptInfo> {
@Override
public int compare(TaskAttemptInfo attempt1, TaskAttemptInfo attempt2) {
long diff = attempt1.getStartTime() - attempt2.getStartTime();
return diff == 0 ? 0 : (diff < 0 ? -1 : 1);
}
}
private static final RecoverdAttemptsComparator RECOVERED_ATTEMPTS_COMPARATOR =
new RecoverdAttemptsComparator();
// For sorting task attempts by completion time
private static final Comparator<TaskAttemptInfo> TA_INFO_COMPARATOR =
new Comparator<TaskAttemptInfo>() {
@Override
public int compare(TaskAttemptInfo a, TaskAttemptInfo b) {
long diff = a.getFinishTime() - b.getFinishTime();
return diff == 0 ? 0 : (diff < 0 ? -1 : 1);
}
};
@Override
public TaskState getState() {
@ -280,8 +285,7 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
TaskAttemptListener taskAttemptListener,
Token<JobTokenIdentifier> jobToken,
Credentials credentials, Clock clock,
Map<TaskId, TaskInfo> completedTasksFromPreviousRun, int startCount,
MRAppMetrics metrics, AppContext appContext) {
int appAttemptId, MRAppMetrics metrics, AppContext appContext) {
this.conf = conf;
this.clock = clock;
this.jobFile = remoteJobConfFile;
@ -307,41 +311,15 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
this.encryptedShuffle = conf.getBoolean(MRConfig.SHUFFLE_SSL_ENABLED_KEY,
MRConfig.SHUFFLE_SSL_ENABLED_DEFAULT);
// See if this is from a previous generation.
if (completedTasksFromPreviousRun != null
&& completedTasksFromPreviousRun.containsKey(taskId)) {
// This task has TaskAttempts from previous generation. We have to replay
// them.
LOG.info("Task is from previous run " + taskId);
TaskInfo taskInfo = completedTasksFromPreviousRun.get(taskId);
Map<TaskAttemptID, TaskAttemptInfo> allAttempts =
taskInfo.getAllTaskAttempts();
taskAttemptsFromPreviousGeneration = new ArrayList<TaskAttemptInfo>();
taskAttemptsFromPreviousGeneration.addAll(allAttempts.values());
Collections.sort(taskAttemptsFromPreviousGeneration,
RECOVERED_ATTEMPTS_COMPARATOR);
}
if (taskAttemptsFromPreviousGeneration.isEmpty()) {
// All the previous attempts are exhausted, now start with a new
// generation.
// All the new TaskAttemptIDs are generated based on MR
// ApplicationAttemptID so that attempts from previous lives don't
// over-step the current one. This assumes that a task won't have more
// than 1000 attempts in its single generation, which is very reasonable.
// Someone is nuts if he/she thinks he/she can live with 1000 TaskAttempts
// and requires serious medical attention.
nextAttemptNumber = (startCount - 1) * 1000;
} else {
// There are still some TaskAttempts from previous generation, use them
nextAttemptNumber =
taskAttemptsFromPreviousGeneration.remove(0).getAttemptId().getId();
}
// This "this leak" is okay because the retained pointer is in an
// instance variable.
stateMachine = stateMachineFactory.make(this);
// All the new TaskAttemptIDs are generated based on MR
// ApplicationAttemptID so that attempts from previous lives don't
// over-step the current one. This assumes that a task won't have more
// than 1000 attempts in its single generation, which is very reasonable.
nextAttemptNumber = (appAttemptId - 1) * 1000;
}
@Override
@ -600,14 +578,28 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
// This is always called in the Write Lock
private void addAndScheduleAttempt(Avataar avataar) {
TaskAttempt attempt = createAttempt();
((TaskAttemptImpl) attempt).setAvataar(avataar);
TaskAttempt attempt = addAttempt(avataar);
inProgressAttempts.add(attempt.getID());
//schedule the nextAttemptNumber
if (failedAttempts.size() > 0) {
eventHandler.handle(new TaskAttemptEvent(attempt.getID(),
TaskAttemptEventType.TA_RESCHEDULE));
} else {
eventHandler.handle(new TaskAttemptEvent(attempt.getID(),
TaskAttemptEventType.TA_SCHEDULE));
}
}
private TaskAttemptImpl addAttempt(Avataar avataar) {
TaskAttemptImpl attempt = createAttempt();
attempt.setAvataar(avataar);
if (LOG.isDebugEnabled()) {
LOG.debug("Created attempt " + attempt.getID());
}
switch (attempts.size()) {
case 0:
attempts = Collections.singletonMap(attempt.getID(), attempt);
attempts = Collections.singletonMap(attempt.getID(),
(TaskAttempt) attempt);
break;
case 1:
@ -623,24 +615,8 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
break;
}
// Update nextATtemptNumber
if (taskAttemptsFromPreviousGeneration.isEmpty()) {
++nextAttemptNumber;
} else {
// There are still some TaskAttempts from previous generation, use them
nextAttemptNumber =
taskAttemptsFromPreviousGeneration.remove(0).getAttemptId().getId();
}
inProgressAttempts.add(attempt.getID());
//schedule the nextAttemptNumber
if (failedAttempts.size() > 0) {
eventHandler.handle(new TaskAttemptEvent(attempt.getID(),
TaskAttemptEventType.TA_RESCHEDULE));
} else {
eventHandler.handle(new TaskAttemptEvent(attempt.getID(),
TaskAttemptEventType.TA_SCHEDULE));
}
++nextAttemptNumber;
return attempt;
}
@Override
@ -705,6 +681,16 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
}
}
private void sendTaskStartedEvent() {
TaskStartedEvent tse = new TaskStartedEvent(
TypeConverter.fromYarn(taskId), getLaunchTime(),
TypeConverter.fromYarn(taskId.getTaskType()),
getSplitsAsString());
eventHandler
.handle(new JobHistoryEvent(taskId.getJobId(), tse));
historyTaskStartGenerated = true;
}
private static TaskFinishedEvent createTaskFinishedEvent(TaskImpl task, TaskStateInternal taskState) {
TaskFinishedEvent tfe =
new TaskFinishedEvent(TypeConverter.fromYarn(task.taskId),
@ -740,6 +726,16 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
task.successfulAttempt = null;
}
private void sendTaskSucceededEvents() {
eventHandler.handle(new JobTaskEvent(taskId, TaskState.SUCCEEDED));
LOG.info("Task succeeded with attempt " + successfulAttempt);
if (historyTaskStartGenerated) {
TaskFinishedEvent tfe = createTaskFinishedEvent(this,
TaskStateInternal.SUCCEEDED);
eventHandler.handle(new JobHistoryEvent(taskId.getJobId(), tfe));
}
}
/**
* @return a String representation of the splits.
*
@ -751,6 +747,122 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
return "";
}
/**
* Recover a completed task from a previous application attempt
* @param taskInfo recovered info about the task
* @param recoverTaskOutput whether to recover task outputs
* @return state of the task after recovery
*/
private TaskStateInternal recover(TaskInfo taskInfo,
OutputCommitter committer, boolean recoverTaskOutput) {
LOG.info("Recovering task " + taskId
+ " from prior app attempt, status was " + taskInfo.getTaskStatus());
scheduledTime = taskInfo.getStartTime();
sendTaskStartedEvent();
Collection<TaskAttemptInfo> attemptInfos =
taskInfo.getAllTaskAttempts().values();
if (attemptInfos.size() > 0) {
metrics.launchedTask(this);
}
// recover the attempts for this task in the order they finished
// so task attempt completion events are ordered properly
int savedNextAttemptNumber = nextAttemptNumber;
ArrayList<TaskAttemptInfo> taInfos =
new ArrayList<TaskAttemptInfo>(taskInfo.getAllTaskAttempts().values());
Collections.sort(taInfos, TA_INFO_COMPARATOR);
for (TaskAttemptInfo taInfo : taInfos) {
nextAttemptNumber = taInfo.getAttemptId().getId();
TaskAttemptImpl attempt = addAttempt(Avataar.VIRGIN);
// handle the recovery inline so attempts complete before task does
attempt.handle(new TaskAttemptRecoverEvent(attempt.getID(), taInfo,
committer, recoverTaskOutput));
finishedAttempts.add(attempt.getID());
TaskAttemptCompletionEventStatus taces = null;
TaskAttemptState attemptState = attempt.getState();
switch (attemptState) {
case FAILED:
taces = TaskAttemptCompletionEventStatus.FAILED;
break;
case KILLED:
taces = TaskAttemptCompletionEventStatus.KILLED;
break;
case SUCCEEDED:
taces = TaskAttemptCompletionEventStatus.SUCCEEDED;
break;
default:
throw new IllegalStateException(
"Unexpected attempt state during recovery: " + attemptState);
}
if (attemptState == TaskAttemptState.FAILED) {
failedAttempts.add(attempt.getID());
if (failedAttempts.size() >= maxAttempts) {
taces = TaskAttemptCompletionEventStatus.TIPFAILED;
}
}
// don't clobber the successful attempt completion event
// TODO: this shouldn't be necessary after MAPREDUCE-4330
if (successfulAttempt == null) {
handleTaskAttemptCompletion(attempt.getID(), taces);
if (attemptState == TaskAttemptState.SUCCEEDED) {
successfulAttempt = attempt.getID();
}
}
}
nextAttemptNumber = savedNextAttemptNumber;
TaskStateInternal taskState = TaskStateInternal.valueOf(
taskInfo.getTaskStatus());
switch (taskState) {
case SUCCEEDED:
if (successfulAttempt != null) {
sendTaskSucceededEvents();
} else {
LOG.info("Missing successful attempt for task " + taskId
+ ", recovering as RUNNING");
// there must have been a fetch failure and the retry wasn't complete
taskState = TaskStateInternal.RUNNING;
metrics.runningTask(this);
addAndScheduleAttempt(Avataar.VIRGIN);
}
break;
case FAILED:
case KILLED:
{
if (taskState == TaskStateInternal.KILLED && attemptInfos.size() == 0) {
metrics.endWaitingTask(this);
}
TaskFailedEvent tfe = new TaskFailedEvent(taskInfo.getTaskId(),
taskInfo.getFinishTime(), taskInfo.getTaskType(),
taskInfo.getError(), taskInfo.getTaskStatus(),
taskInfo.getFailedDueToAttemptId(), taskInfo.getCounters());
eventHandler.handle(new JobHistoryEvent(taskId.getJobId(), tfe));
eventHandler.handle(
new JobTaskEvent(taskId, getExternalState(taskState)));
break;
}
default:
throw new java.lang.AssertionError("Unexpected recovered task state: "
+ taskState);
}
return taskState;
}
private static class RecoverTransition
implements MultipleArcTransition<TaskImpl, TaskEvent, TaskStateInternal> {
@Override
public TaskStateInternal transition(TaskImpl task, TaskEvent event) {
TaskRecoverEvent tre = (TaskRecoverEvent) event;
return task.recover(tre.getTaskInfo(), tre.getOutputCommitter(),
tre.getRecoverTaskOutput());
}
}
private static class InitialScheduleTransition
implements SingleArcTransition<TaskImpl, TaskEvent> {
@ -758,13 +870,7 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
public void transition(TaskImpl task, TaskEvent event) {
task.addAndScheduleAttempt(Avataar.VIRGIN);
task.scheduledTime = task.clock.getTime();
TaskStartedEvent tse = new TaskStartedEvent(
TypeConverter.fromYarn(task.taskId), task.getLaunchTime(),
TypeConverter.fromYarn(task.taskId.getTaskType()),
task.getSplitsAsString());
task.eventHandler
.handle(new JobHistoryEvent(task.taskId.getJobId(), tse));
task.historyTaskStartGenerated = true;
task.sendTaskStartedEvent();
}
}
@ -818,16 +924,7 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
task.finishedAttempts.add(taskAttemptId);
task.inProgressAttempts.remove(taskAttemptId);
task.successfulAttempt = taskAttemptId;
task.eventHandler.handle(new JobTaskEvent(
task.taskId, TaskState.SUCCEEDED));
LOG.info("Task succeeded with attempt " + task.successfulAttempt);
// issue kill to all other attempts
if (task.historyTaskStartGenerated) {
TaskFinishedEvent tfe = createTaskFinishedEvent(task,
TaskStateInternal.SUCCEEDED);
task.eventHandler.handle(new JobHistoryEvent(task.taskId.getJobId(),
tfe));
}
task.sendTaskSucceededEvents();
for (TaskAttempt attempt : task.attempts.values()) {
if (attempt.getID() != task.successfulAttempt &&
// This is okay because it can only talk us out of sending a

View File

@ -1,39 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.v2.app.recover;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.event.Dispatcher;
public interface Recovery {
Dispatcher getDispatcher();
Clock getClock();
Map<TaskId, TaskInfo> getCompletedTasks();
List<AMInfo> getAMInfos();
}

View File

@ -1,480 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.v2.app.recover;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
import org.apache.hadoop.mapreduce.v2.api.records.Phase;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
import org.apache.hadoop.mapreduce.v2.app.ControlledClock;
import org.apache.hadoop.mapreduce.v2.app.commit.CommitterTaskAbortEvent;
import org.apache.hadoop.mapreduce.v2.app.commit.CommitterEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobDiagnosticsUpdateEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerLaunchedEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerRemoteLaunchEvent;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.service.CompositeService;
import org.apache.hadoop.yarn.service.Service;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.apache.hadoop.yarn.util.ConverterUtils;
/*
* Recovers the completed tasks from the previous life of Application Master.
* The completed tasks are deciphered from the history file of the previous life.
* Recovery service intercepts and replay the events for completed tasks.
* While recovery is in progress, the scheduling of new tasks are delayed by
* buffering the task schedule events.
* The recovery service controls the clock while recovery is in progress.
*/
//TODO:
//task cleanup for all non completed tasks
public class RecoveryService extends CompositeService implements Recovery {
private static final Log LOG = LogFactory.getLog(RecoveryService.class);
private final ApplicationAttemptId applicationAttemptId;
private final OutputCommitter committer;
private final boolean newApiCommitter;
private final Dispatcher dispatcher;
private final ControlledClock clock;
private JobInfo jobInfo = null;
private final Map<TaskId, TaskInfo> completedTasks =
new HashMap<TaskId, TaskInfo>();
private final List<TaskEvent> pendingTaskScheduleEvents =
new ArrayList<TaskEvent>();
private volatile boolean recoveryMode = false;
public RecoveryService(ApplicationAttemptId applicationAttemptId,
Clock clock, OutputCommitter committer, boolean newApiCommitter) {
super("RecoveringDispatcher");
this.applicationAttemptId = applicationAttemptId;
this.committer = committer;
this.newApiCommitter = newApiCommitter;
this.dispatcher = createRecoveryDispatcher();
this.clock = new ControlledClock(clock);
addService((Service) dispatcher);
}
@Override
public void init(Configuration conf) {
super.init(conf);
// parse the history file
try {
parse();
} catch (Exception e) {
LOG.warn(e);
LOG.warn("Could not parse the old history file. Aborting recovery. "
+ "Starting afresh.", e);
}
if (completedTasks.size() > 0) {
recoveryMode = true;
LOG.info("SETTING THE RECOVERY MODE TO TRUE. NO OF COMPLETED TASKS "
+ "TO RECOVER " + completedTasks.size());
LOG.info("Job launch time " + jobInfo.getLaunchTime());
clock.setTime(jobInfo.getLaunchTime());
}
}
@Override
public Dispatcher getDispatcher() {
return dispatcher;
}
@Override
public Clock getClock() {
return clock;
}
@Override
public Map<TaskId, TaskInfo> getCompletedTasks() {
return completedTasks;
}
@Override
public List<AMInfo> getAMInfos() {
if (jobInfo == null || jobInfo.getAMInfos() == null) {
return new LinkedList<AMInfo>();
}
List<AMInfo> amInfos = new LinkedList<AMInfo>();
for (org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.AMInfo jhAmInfo : jobInfo
.getAMInfos()) {
AMInfo amInfo =
MRBuilderUtils.newAMInfo(jhAmInfo.getAppAttemptId(),
jhAmInfo.getStartTime(), jhAmInfo.getContainerId(),
jhAmInfo.getNodeManagerHost(), jhAmInfo.getNodeManagerPort(),
jhAmInfo.getNodeManagerHttpPort());
amInfos.add(amInfo);
}
return amInfos;
}
private void parse() throws IOException {
FSDataInputStream in =
getPreviousJobHistoryFileStream(getConfig(), applicationAttemptId);
JobHistoryParser parser = new JobHistoryParser(in);
jobInfo = parser.parse();
Exception parseException = parser.getParseException();
if (parseException != null) {
LOG.info("Got an error parsing job-history file" +
", ignoring incomplete events.", parseException);
}
Map<org.apache.hadoop.mapreduce.TaskID, TaskInfo> taskInfos = jobInfo
.getAllTasks();
for (TaskInfo taskInfo : taskInfos.values()) {
if (TaskState.SUCCEEDED.toString().equals(taskInfo.getTaskStatus())) {
Iterator<Entry<TaskAttemptID, TaskAttemptInfo>> taskAttemptIterator =
taskInfo.getAllTaskAttempts().entrySet().iterator();
while (taskAttemptIterator.hasNext()) {
Map.Entry<TaskAttemptID, TaskAttemptInfo> currentEntry = taskAttemptIterator.next();
if (!jobInfo.getAllCompletedTaskAttempts().containsKey(currentEntry.getKey())) {
taskAttemptIterator.remove();
}
}
completedTasks
.put(TypeConverter.toYarn(taskInfo.getTaskId()), taskInfo);
LOG.info("Read from history task "
+ TypeConverter.toYarn(taskInfo.getTaskId()));
}
}
LOG.info("Read completed tasks from history "
+ completedTasks.size());
}
public static FSDataInputStream getPreviousJobHistoryFileStream(
Configuration conf, ApplicationAttemptId applicationAttemptId)
throws IOException {
FSDataInputStream in = null;
Path historyFile = null;
String jobId =
TypeConverter.fromYarn(applicationAttemptId.getApplicationId())
.toString();
String jobhistoryDir =
JobHistoryUtils.getConfiguredHistoryStagingDirPrefix(conf, jobId);
Path histDirPath =
FileContext.getFileContext(conf).makeQualified(new Path(jobhistoryDir));
LOG.info("Trying file " + histDirPath.toString());
FileContext fc = FileContext.getFileContext(histDirPath.toUri(), conf);
// read the previous history file
historyFile =
fc.makeQualified(JobHistoryUtils.getStagingJobHistoryFile(histDirPath,
jobId, (applicationAttemptId.getAttemptId() - 1)));
LOG.info("History file is at " + historyFile);
in = fc.open(historyFile);
return in;
}
protected Dispatcher createRecoveryDispatcher() {
return new RecoveryDispatcher();
}
@SuppressWarnings("rawtypes")
class RecoveryDispatcher extends AsyncDispatcher {
private final EventHandler actualHandler;
private final EventHandler handler;
RecoveryDispatcher() {
super();
actualHandler = super.getEventHandler();
handler = new InterceptingEventHandler(actualHandler);
}
@Override
@SuppressWarnings("unchecked")
public void dispatch(Event event) {
if (recoveryMode) {
if (event.getType() == TaskAttemptEventType.TA_CONTAINER_LAUNCHED) {
TaskAttemptInfo attInfo = getTaskAttemptInfo(((TaskAttemptEvent) event)
.getTaskAttemptID());
LOG.info("Recovered Attempt start time " + attInfo.getStartTime());
clock.setTime(attInfo.getStartTime());
} else if (event.getType() == TaskAttemptEventType.TA_DONE
|| event.getType() == TaskAttemptEventType.TA_FAILMSG
|| event.getType() == TaskAttemptEventType.TA_KILL) {
TaskAttemptInfo attInfo = getTaskAttemptInfo(((TaskAttemptEvent) event)
.getTaskAttemptID());
LOG.info("Recovered Attempt finish time " + attInfo.getFinishTime());
clock.setTime(attInfo.getFinishTime());
}
else if (event.getType() == TaskEventType.T_ATTEMPT_FAILED
|| event.getType() == TaskEventType.T_ATTEMPT_KILLED
|| event.getType() == TaskEventType.T_ATTEMPT_SUCCEEDED) {
TaskTAttemptEvent tEvent = (TaskTAttemptEvent) event;
LOG.info("Recovered Task attempt " + tEvent.getTaskAttemptID());
TaskInfo taskInfo = completedTasks.get(tEvent.getTaskAttemptID()
.getTaskId());
taskInfo.getAllTaskAttempts().remove(
TypeConverter.fromYarn(tEvent.getTaskAttemptID()));
// remove the task info from completed tasks if all attempts are
// recovered
if (taskInfo.getAllTaskAttempts().size() == 0) {
completedTasks.remove(tEvent.getTaskAttemptID().getTaskId());
// checkForRecoveryComplete
LOG.info("CompletedTasks() " + completedTasks.size());
if (completedTasks.size() == 0) {
recoveryMode = false;
clock.reset();
LOG.info("Setting the recovery mode to false. " +
"Recovery is complete!");
// send all pending tasks schedule events
for (TaskEvent tEv : pendingTaskScheduleEvents) {
actualHandler.handle(tEv);
}
}
}
}
}
realDispatch(event);
}
public void realDispatch(Event event) {
super.dispatch(event);
}
@Override
public EventHandler getEventHandler() {
return handler;
}
}
private TaskAttemptInfo getTaskAttemptInfo(TaskAttemptId id) {
TaskInfo taskInfo = completedTasks.get(id.getTaskId());
return taskInfo.getAllTaskAttempts().get(TypeConverter.fromYarn(id));
}
@SuppressWarnings({"rawtypes", "unchecked"})
private class InterceptingEventHandler implements EventHandler {
EventHandler actualHandler;
InterceptingEventHandler(EventHandler actualHandler) {
this.actualHandler = actualHandler;
}
@Override
public void handle(Event event) {
if (!recoveryMode) {
// delegate to the dispatcher one
actualHandler.handle(event);
return;
}
else if (event.getType() == TaskEventType.T_SCHEDULE) {
TaskEvent taskEvent = (TaskEvent) event;
// delay the scheduling of new tasks till previous ones are recovered
if (completedTasks.get(taskEvent.getTaskID()) == null) {
LOG.debug("Adding to pending task events "
+ taskEvent.getTaskID());
pendingTaskScheduleEvents.add(taskEvent);
return;
}
}
else if (event.getType() == ContainerAllocator.EventType.CONTAINER_REQ) {
TaskAttemptId aId = ((ContainerAllocatorEvent) event).getAttemptID();
TaskAttemptInfo attInfo = getTaskAttemptInfo(aId);
LOG.debug("CONTAINER_REQ " + aId);
sendAssignedEvent(aId, attInfo);
return;
}
else if (event.getType() == CommitterEventType.TASK_ABORT) {
TaskAttemptId aId = ((CommitterTaskAbortEvent) event).getAttemptID();
LOG.debug("TASK_CLEAN");
actualHandler.handle(new TaskAttemptEvent(aId,
TaskAttemptEventType.TA_CLEANUP_DONE));
return;
}
else if (event.getType() == ContainerLauncher.EventType.CONTAINER_REMOTE_LAUNCH) {
TaskAttemptId aId = ((ContainerRemoteLaunchEvent) event)
.getTaskAttemptID();
TaskAttemptInfo attInfo = getTaskAttemptInfo(aId);
actualHandler.handle(new TaskAttemptContainerLaunchedEvent(aId,
attInfo.getShufflePort()));
// send the status update event
sendStatusUpdateEvent(aId, attInfo);
TaskAttemptState state = TaskAttemptState.valueOf(attInfo.getTaskStatus());
switch (state) {
case SUCCEEDED:
//recover the task output
// check the committer type and construct corresponding context
TaskAttemptContext taskContext = null;
if(newApiCommitter) {
taskContext = new TaskAttemptContextImpl(getConfig(),
attInfo.getAttemptId());
} else {
taskContext = new org.apache.hadoop.mapred.TaskAttemptContextImpl(new JobConf(getConfig()),
TypeConverter.fromYarn(aId));
}
try {
TaskType type = taskContext.getTaskAttemptID().getTaskID().getTaskType();
int numReducers = taskContext.getConfiguration().getInt(MRJobConfig.NUM_REDUCES, 1);
if(type == TaskType.REDUCE || (type == TaskType.MAP && numReducers <= 0)) {
committer.recoverTask(taskContext);
LOG.info("Recovered output from task attempt " + attInfo.getAttemptId());
} else {
LOG.info("Will not try to recover output for "
+ taskContext.getTaskAttemptID());
}
} catch (IOException e) {
LOG.error("Caught an exception while trying to recover task "+aId, e);
actualHandler.handle(new JobDiagnosticsUpdateEvent(
aId.getTaskId().getJobId(), "Error in recovering task output " +
e.getMessage()));
actualHandler.handle(new JobEvent(aId.getTaskId().getJobId(),
JobEventType.INTERNAL_ERROR));
}
// send the done event
LOG.info("Sending done event to recovered attempt " + aId);
actualHandler.handle(new TaskAttemptEvent(aId,
TaskAttemptEventType.TA_DONE));
break;
case KILLED:
LOG.info("Sending kill event to recovered attempt " + aId);
actualHandler.handle(new TaskAttemptEvent(aId,
TaskAttemptEventType.TA_KILL));
break;
default:
LOG.info("Sending fail event to recovered attempt " + aId);
actualHandler.handle(new TaskAttemptEvent(aId,
TaskAttemptEventType.TA_FAILMSG));
break;
}
return;
}
else if (event.getType() ==
ContainerLauncher.EventType.CONTAINER_REMOTE_CLEANUP) {
TaskAttemptId aId = ((ContainerLauncherEvent) event)
.getTaskAttemptID();
actualHandler.handle(
new TaskAttemptEvent(aId,
TaskAttemptEventType.TA_CONTAINER_CLEANED));
return;
}
// delegate to the actual handler
actualHandler.handle(event);
}
private void sendStatusUpdateEvent(TaskAttemptId yarnAttemptID,
TaskAttemptInfo attemptInfo) {
LOG.info("Sending status update event to " + yarnAttemptID);
TaskAttemptStatus taskAttemptStatus = new TaskAttemptStatus();
taskAttemptStatus.id = yarnAttemptID;
taskAttemptStatus.progress = 1.0f;
taskAttemptStatus.stateString = attemptInfo.getTaskStatus();
// taskAttemptStatus.outputSize = attemptInfo.getOutputSize();
taskAttemptStatus.phase = Phase.CLEANUP;
org.apache.hadoop.mapreduce.Counters cntrs = attemptInfo.getCounters();
if (cntrs == null) {
taskAttemptStatus.counters = null;
} else {
taskAttemptStatus.counters = cntrs;
}
actualHandler.handle(new TaskAttemptStatusUpdateEvent(
taskAttemptStatus.id, taskAttemptStatus));
}
private void sendAssignedEvent(TaskAttemptId yarnAttemptID,
TaskAttemptInfo attemptInfo) {
LOG.info("Sending assigned event to " + yarnAttemptID);
ContainerId cId = attemptInfo.getContainerId();
NodeId nodeId =
ConverterUtils.toNodeId(attemptInfo.getHostname() + ":"
+ attemptInfo.getPort());
// Resource/Priority/ApplicationACLs are only needed while launching the
// container on an NM, these are already completed tasks, so setting them
// to null
Container container = BuilderUtils.newContainer(cId, nodeId,
attemptInfo.getTrackerName() + ":" + attemptInfo.getHttpPort(),
null, null, null);
actualHandler.handle(new TaskAttemptContainerAssignedEvent(yarnAttemptID,
container, null));
}
}
}

View File

@ -414,7 +414,8 @@ public class MRApp extends MRAppMaster {
Job newJob = new TestJob(getJobId(), getAttemptID(), conf,
getDispatcher().getEventHandler(),
getTaskAttemptListener(), getContext().getClock(),
isNewApiCommitter(), currentUser.getUserName(), getContext(),
getCommitter(), isNewApiCommitter(),
currentUser.getUserName(), getContext(),
forcedState, diagnostic);
((AppContext) getContext()).getAllJobs().put(newJob.getID(), newJob);
@ -648,12 +649,13 @@ public class MRApp extends MRAppMaster {
public TestJob(JobId jobId, ApplicationAttemptId applicationAttemptId,
Configuration conf, EventHandler eventHandler,
TaskAttemptListener taskAttemptListener, Clock clock,
boolean newApiCommitter, String user, AppContext appContext,
OutputCommitter committer, boolean newApiCommitter,
String user, AppContext appContext,
JobStateInternal forcedState, String diagnostic) {
super(jobId, getApplicationAttemptId(applicationId, getStartCount()),
conf, eventHandler, taskAttemptListener,
new JobTokenSecretManager(), new Credentials(), clock,
getCompletedTaskFromPreviousRun(), metrics,
getCompletedTaskFromPreviousRun(), metrics, committer,
newApiCommitter, user, System.currentTimeMillis(), getAllAMInfos(),
appContext, forcedState, diagnostic);

View File

@ -18,10 +18,21 @@
package org.apache.hadoop.mapreduce.v2.app;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import junit.framework.Assert;
@ -31,36 +42,66 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.JobCounter;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.jobhistory.Event;
import org.apache.hadoop.mapreduce.jobhistory.EventType;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobCounterUpdateEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerLaunchedEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskRecoverEvent;
import org.apache.hadoop.mapreduce.v2.app.job.impl.MapTaskImpl;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent;
import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.ClusterInfo;
import org.apache.hadoop.yarn.SystemClock;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
@SuppressWarnings({"unchecked", "rawtypes"})
public class TestRecovery {
@ -75,6 +116,7 @@ public class TestRecovery {
private Text val1 = new Text("val1");
private Text val2 = new Text("val2");
/**
* AM with 2 maps and 1 reduce. For 1st map, one attempt fails, one attempt
* completely disappears because of failed launch, one attempt gets killed and
@ -1011,6 +1053,423 @@ public class TestRecovery {
app.verifyCompleted();
}
@Test
public void testRecoverySuccessAttempt() {
LOG.info("--- START: testRecoverySuccessAttempt ---");
long clusterTimestamp = System.currentTimeMillis();
EventHandler mockEventHandler = mock(EventHandler.class);
MapTaskImpl recoverMapTask = getMockMapTask(clusterTimestamp,
mockEventHandler);
TaskId taskId = recoverMapTask.getID();
JobID jobID = new JobID(Long.toString(clusterTimestamp), 1);
TaskID taskID = new TaskID(jobID,
org.apache.hadoop.mapreduce.TaskType.MAP, taskId.getId());
//Mock up the TaskAttempts
Map<TaskAttemptID, TaskAttemptInfo> mockTaskAttempts =
new HashMap<TaskAttemptID, TaskAttemptInfo>();
TaskAttemptID taId1 = new TaskAttemptID(taskID, 2);
TaskAttemptInfo mockTAinfo1 = getMockTaskAttemptInfo(taId1,
TaskAttemptState.SUCCEEDED);
mockTaskAttempts.put(taId1, mockTAinfo1);
TaskAttemptID taId2 = new TaskAttemptID(taskID, 1);
TaskAttemptInfo mockTAinfo2 = getMockTaskAttemptInfo(taId2,
TaskAttemptState.FAILED);
mockTaskAttempts.put(taId2, mockTAinfo2);
OutputCommitter mockCommitter = mock (OutputCommitter.class);
TaskInfo mockTaskInfo = mock(TaskInfo.class);
when(mockTaskInfo.getTaskStatus()).thenReturn("SUCCEEDED");
when(mockTaskInfo.getTaskId()).thenReturn(taskID);
when(mockTaskInfo.getAllTaskAttempts()).thenReturn(mockTaskAttempts);
recoverMapTask.handle(
new TaskRecoverEvent(taskId, mockTaskInfo,mockCommitter, true));
ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
verify(mockEventHandler,atLeast(1)).handle(
(org.apache.hadoop.yarn.event.Event) arg.capture());
Map<TaskAttemptID, TaskAttemptState> finalAttemptStates =
new HashMap<TaskAttemptID, TaskAttemptState>();
finalAttemptStates.put(taId1, TaskAttemptState.SUCCEEDED);
finalAttemptStates.put(taId2, TaskAttemptState.FAILED);
List<EventType> jobHistoryEvents = new ArrayList<EventType>();
jobHistoryEvents.add(EventType.TASK_STARTED);
jobHistoryEvents.add(EventType.MAP_ATTEMPT_STARTED);
jobHistoryEvents.add(EventType.MAP_ATTEMPT_FINISHED);
jobHistoryEvents.add(EventType.MAP_ATTEMPT_STARTED);
jobHistoryEvents.add(EventType.MAP_ATTEMPT_FAILED);
jobHistoryEvents.add(EventType.TASK_FINISHED);
recoveryChecker(recoverMapTask, TaskState.SUCCEEDED, finalAttemptStates,
arg, jobHistoryEvents, 2L, 1L);
}
@Test
public void testRecoveryAllFailAttempts() {
LOG.info("--- START: testRecoveryAllFailAttempts ---");
long clusterTimestamp = System.currentTimeMillis();
EventHandler mockEventHandler = mock(EventHandler.class);
MapTaskImpl recoverMapTask = getMockMapTask(clusterTimestamp,
mockEventHandler);
TaskId taskId = recoverMapTask.getID();
JobID jobID = new JobID(Long.toString(clusterTimestamp), 1);
TaskID taskID = new TaskID(jobID,
org.apache.hadoop.mapreduce.TaskType.MAP, taskId.getId());
//Mock up the TaskAttempts
Map<TaskAttemptID, TaskAttemptInfo> mockTaskAttempts =
new HashMap<TaskAttemptID, TaskAttemptInfo>();
TaskAttemptID taId1 = new TaskAttemptID(taskID, 2);
TaskAttemptInfo mockTAinfo1 = getMockTaskAttemptInfo(taId1,
TaskAttemptState.FAILED);
mockTaskAttempts.put(taId1, mockTAinfo1);
TaskAttemptID taId2 = new TaskAttemptID(taskID, 1);
TaskAttemptInfo mockTAinfo2 = getMockTaskAttemptInfo(taId2,
TaskAttemptState.FAILED);
mockTaskAttempts.put(taId2, mockTAinfo2);
OutputCommitter mockCommitter = mock (OutputCommitter.class);
TaskInfo mockTaskInfo = mock(TaskInfo.class);
when(mockTaskInfo.getTaskStatus()).thenReturn("FAILED");
when(mockTaskInfo.getTaskId()).thenReturn(taskID);
when(mockTaskInfo.getAllTaskAttempts()).thenReturn(mockTaskAttempts);
recoverMapTask.handle(
new TaskRecoverEvent(taskId, mockTaskInfo, mockCommitter, true));
ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
verify(mockEventHandler,atLeast(1)).handle(
(org.apache.hadoop.yarn.event.Event) arg.capture());
Map<TaskAttemptID, TaskAttemptState> finalAttemptStates =
new HashMap<TaskAttemptID, TaskAttemptState>();
finalAttemptStates.put(taId1, TaskAttemptState.FAILED);
finalAttemptStates.put(taId2, TaskAttemptState.FAILED);
List<EventType> jobHistoryEvents = new ArrayList<EventType>();
jobHistoryEvents.add(EventType.TASK_STARTED);
jobHistoryEvents.add(EventType.MAP_ATTEMPT_STARTED);
jobHistoryEvents.add(EventType.MAP_ATTEMPT_FAILED);
jobHistoryEvents.add(EventType.MAP_ATTEMPT_STARTED);
jobHistoryEvents.add(EventType.MAP_ATTEMPT_FAILED);
jobHistoryEvents.add(EventType.TASK_FAILED);
recoveryChecker(recoverMapTask, TaskState.FAILED, finalAttemptStates,
arg, jobHistoryEvents, 2L, 2L);
}
@Test
public void testRecoveryTaskSuccessAllAttemptsFail() {
LOG.info("--- START: testRecoveryTaskSuccessAllAttemptsFail ---");
long clusterTimestamp = System.currentTimeMillis();
EventHandler mockEventHandler = mock(EventHandler.class);
MapTaskImpl recoverMapTask = getMockMapTask(clusterTimestamp,
mockEventHandler);
TaskId taskId = recoverMapTask.getID();
JobID jobID = new JobID(Long.toString(clusterTimestamp), 1);
TaskID taskID = new TaskID(jobID,
org.apache.hadoop.mapreduce.TaskType.MAP, taskId.getId());
//Mock up the TaskAttempts
Map<TaskAttemptID, TaskAttemptInfo> mockTaskAttempts =
new HashMap<TaskAttemptID, TaskAttemptInfo>();
TaskAttemptID taId1 = new TaskAttemptID(taskID, 2);
TaskAttemptInfo mockTAinfo1 = getMockTaskAttemptInfo(taId1,
TaskAttemptState.FAILED);
mockTaskAttempts.put(taId1, mockTAinfo1);
TaskAttemptID taId2 = new TaskAttemptID(taskID, 1);
TaskAttemptInfo mockTAinfo2 = getMockTaskAttemptInfo(taId2,
TaskAttemptState.FAILED);
mockTaskAttempts.put(taId2, mockTAinfo2);
OutputCommitter mockCommitter = mock (OutputCommitter.class);
TaskInfo mockTaskInfo = mock(TaskInfo.class);
when(mockTaskInfo.getTaskStatus()).thenReturn("SUCCEEDED");
when(mockTaskInfo.getTaskId()).thenReturn(taskID);
when(mockTaskInfo.getAllTaskAttempts()).thenReturn(mockTaskAttempts);
recoverMapTask.handle(
new TaskRecoverEvent(taskId, mockTaskInfo, mockCommitter, true));
ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
verify(mockEventHandler,atLeast(1)).handle(
(org.apache.hadoop.yarn.event.Event) arg.capture());
Map<TaskAttemptID, TaskAttemptState> finalAttemptStates =
new HashMap<TaskAttemptID, TaskAttemptState>();
finalAttemptStates.put(taId1, TaskAttemptState.FAILED);
finalAttemptStates.put(taId2, TaskAttemptState.FAILED);
// check for one new attempt launched since successful attempt not found
TaskAttemptID taId3 = new TaskAttemptID(taskID, 2000);
finalAttemptStates.put(taId3, TaskAttemptState.NEW);
List<EventType> jobHistoryEvents = new ArrayList<EventType>();
jobHistoryEvents.add(EventType.TASK_STARTED);
jobHistoryEvents.add(EventType.MAP_ATTEMPT_STARTED);
jobHistoryEvents.add(EventType.MAP_ATTEMPT_FAILED);
jobHistoryEvents.add(EventType.MAP_ATTEMPT_STARTED);
jobHistoryEvents.add(EventType.MAP_ATTEMPT_FAILED);
recoveryChecker(recoverMapTask, TaskState.RUNNING, finalAttemptStates,
arg, jobHistoryEvents, 2L, 2L);
}
@Test
public void testRecoveryTaskSuccessAllAttemptsSucceed() {
LOG.info("--- START: testRecoveryTaskSuccessAllAttemptsFail ---");
long clusterTimestamp = System.currentTimeMillis();
EventHandler mockEventHandler = mock(EventHandler.class);
MapTaskImpl recoverMapTask = getMockMapTask(clusterTimestamp,
mockEventHandler);
TaskId taskId = recoverMapTask.getID();
JobID jobID = new JobID(Long.toString(clusterTimestamp), 1);
TaskID taskID = new TaskID(jobID,
org.apache.hadoop.mapreduce.TaskType.MAP, taskId.getId());
//Mock up the TaskAttempts
Map<TaskAttemptID, TaskAttemptInfo> mockTaskAttempts =
new HashMap<TaskAttemptID, TaskAttemptInfo>();
TaskAttemptID taId1 = new TaskAttemptID(taskID, 2);
TaskAttemptInfo mockTAinfo1 = getMockTaskAttemptInfo(taId1,
TaskAttemptState.SUCCEEDED);
mockTaskAttempts.put(taId1, mockTAinfo1);
TaskAttemptID taId2 = new TaskAttemptID(taskID, 1);
TaskAttemptInfo mockTAinfo2 = getMockTaskAttemptInfo(taId2,
TaskAttemptState.SUCCEEDED);
mockTaskAttempts.put(taId2, mockTAinfo2);
OutputCommitter mockCommitter = mock (OutputCommitter.class);
TaskInfo mockTaskInfo = mock(TaskInfo.class);
when(mockTaskInfo.getTaskStatus()).thenReturn("SUCCEEDED");
when(mockTaskInfo.getTaskId()).thenReturn(taskID);
when(mockTaskInfo.getAllTaskAttempts()).thenReturn(mockTaskAttempts);
recoverMapTask.handle(
new TaskRecoverEvent(taskId, mockTaskInfo, mockCommitter, true));
ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
verify(mockEventHandler,atLeast(1)).handle(
(org.apache.hadoop.yarn.event.Event) arg.capture());
Map<TaskAttemptID, TaskAttemptState> finalAttemptStates =
new HashMap<TaskAttemptID, TaskAttemptState>();
finalAttemptStates.put(taId1, TaskAttemptState.SUCCEEDED);
finalAttemptStates.put(taId2, TaskAttemptState.SUCCEEDED);
List<EventType> jobHistoryEvents = new ArrayList<EventType>();
jobHistoryEvents.add(EventType.TASK_STARTED);
jobHistoryEvents.add(EventType.MAP_ATTEMPT_STARTED);
jobHistoryEvents.add(EventType.MAP_ATTEMPT_FINISHED);
jobHistoryEvents.add(EventType.MAP_ATTEMPT_STARTED);
jobHistoryEvents.add(EventType.MAP_ATTEMPT_FINISHED);
jobHistoryEvents.add(EventType.TASK_FINISHED);
recoveryChecker(recoverMapTask, TaskState.SUCCEEDED, finalAttemptStates,
arg, jobHistoryEvents, 2L, 0L);
}
@Test
public void testRecoveryAllAttemptsKilled() {
LOG.info("--- START: testRecoveryAllAttemptsKilled ---");
long clusterTimestamp = System.currentTimeMillis();
EventHandler mockEventHandler = mock(EventHandler.class);
MapTaskImpl recoverMapTask = getMockMapTask(clusterTimestamp,
mockEventHandler);
TaskId taskId = recoverMapTask.getID();
JobID jobID = new JobID(Long.toString(clusterTimestamp), 1);
TaskID taskID = new TaskID(jobID,
org.apache.hadoop.mapreduce.TaskType.MAP, taskId.getId());
//Mock up the TaskAttempts
Map<TaskAttemptID, TaskAttemptInfo> mockTaskAttempts =
new HashMap<TaskAttemptID, TaskAttemptInfo>();
TaskAttemptID taId1 = new TaskAttemptID(taskID, 2);
TaskAttemptInfo mockTAinfo1 = getMockTaskAttemptInfo(taId1,
TaskAttemptState.KILLED);
mockTaskAttempts.put(taId1, mockTAinfo1);
TaskAttemptID taId2 = new TaskAttemptID(taskID, 1);
TaskAttemptInfo mockTAinfo2 = getMockTaskAttemptInfo(taId2,
TaskAttemptState.KILLED);
mockTaskAttempts.put(taId2, mockTAinfo2);
OutputCommitter mockCommitter = mock (OutputCommitter.class);
TaskInfo mockTaskInfo = mock(TaskInfo.class);
when(mockTaskInfo.getTaskStatus()).thenReturn("KILLED");
when(mockTaskInfo.getTaskId()).thenReturn(taskID);
when(mockTaskInfo.getAllTaskAttempts()).thenReturn(mockTaskAttempts);
recoverMapTask.handle(
new TaskRecoverEvent(taskId, mockTaskInfo, mockCommitter, true));
ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
verify(mockEventHandler,atLeast(1)).handle(
(org.apache.hadoop.yarn.event.Event) arg.capture());
Map<TaskAttemptID, TaskAttemptState> finalAttemptStates =
new HashMap<TaskAttemptID, TaskAttemptState>();
finalAttemptStates.put(taId1, TaskAttemptState.KILLED);
finalAttemptStates.put(taId2, TaskAttemptState.KILLED);
List<EventType> jobHistoryEvents = new ArrayList<EventType>();
jobHistoryEvents.add(EventType.TASK_STARTED);
jobHistoryEvents.add(EventType.MAP_ATTEMPT_STARTED);
jobHistoryEvents.add(EventType.MAP_ATTEMPT_KILLED);
jobHistoryEvents.add(EventType.MAP_ATTEMPT_STARTED);
jobHistoryEvents.add(EventType.MAP_ATTEMPT_KILLED);
jobHistoryEvents.add(EventType.TASK_FAILED);
recoveryChecker(recoverMapTask, TaskState.KILLED, finalAttemptStates,
arg, jobHistoryEvents, 2L, 0L);
}
private void recoveryChecker(MapTaskImpl checkTask, TaskState finalState,
Map<TaskAttemptID, TaskAttemptState> finalAttemptStates,
ArgumentCaptor<Event> arg, List<EventType> expectedJobHistoryEvents,
long expectedMapLaunches, long expectedFailedMaps) {
assertEquals("Final State of Task", finalState, checkTask.getState());
Map<TaskAttemptId, TaskAttempt> recoveredAttempts =
checkTask.getAttempts();
assertEquals("Expected Number of Task Attempts",
finalAttemptStates.size(), recoveredAttempts.size());
for (TaskAttemptID taID : finalAttemptStates.keySet()) {
assertEquals("Expected Task Attempt State",
finalAttemptStates.get(taID),
recoveredAttempts.get(TypeConverter.toYarn(taID)).getState());
}
Iterator<Event> ie = arg.getAllValues().iterator();
int eventNum = 0;
long totalLaunchedMaps = 0;
long totalFailedMaps = 0;
boolean jobTaskEventReceived = false;
while (ie.hasNext()) {
Object current = ie.next();
++eventNum;
LOG.info(eventNum + " " + current.getClass().getName());
if (current instanceof JobHistoryEvent) {
JobHistoryEvent jhe = (JobHistoryEvent) current;
LOG.info(expectedJobHistoryEvents.get(0).toString() + " " +
jhe.getHistoryEvent().getEventType().toString() + " " +
jhe.getJobID());
assertEquals(expectedJobHistoryEvents.get(0),
jhe.getHistoryEvent().getEventType());
expectedJobHistoryEvents.remove(0);
} else if (current instanceof JobCounterUpdateEvent) {
JobCounterUpdateEvent jcue = (JobCounterUpdateEvent) current;
LOG.info("JobCounterUpdateEvent "
+ jcue.getCounterUpdates().get(0).getCounterKey()
+ " " + jcue.getCounterUpdates().get(0).getIncrementValue());
if (jcue.getCounterUpdates().get(0).getCounterKey() ==
JobCounter.NUM_FAILED_MAPS) {
totalFailedMaps += jcue.getCounterUpdates().get(0)
.getIncrementValue();
} else if (jcue.getCounterUpdates().get(0).getCounterKey() ==
JobCounter.TOTAL_LAUNCHED_MAPS) {
totalLaunchedMaps += jcue.getCounterUpdates().get(0)
.getIncrementValue();
}
} else if (current instanceof JobTaskEvent) {
JobTaskEvent jte = (JobTaskEvent) current;
assertEquals(jte.getState(), finalState);
jobTaskEventReceived = true;
}
}
assertTrue(jobTaskEventReceived || (finalState == TaskState.RUNNING));
assertEquals("Did not process all expected JobHistoryEvents",
0, expectedJobHistoryEvents.size());
assertEquals("Expected Map Launches",
expectedMapLaunches, totalLaunchedMaps);
assertEquals("Expected Failed Maps",
expectedFailedMaps, totalFailedMaps);
}
private MapTaskImpl getMockMapTask(long clusterTimestamp, EventHandler eh) {
ApplicationId appId = BuilderUtils.newApplicationId(clusterTimestamp, 1);
JobId jobId = MRBuilderUtils.newJobId(appId, 1);
int partitions = 2;
Path remoteJobConfFile = mock(Path.class);
JobConf conf = new JobConf();
TaskAttemptListener taskAttemptListener = mock(TaskAttemptListener.class);
Token<JobTokenIdentifier> jobToken =
(Token<JobTokenIdentifier>) mock(Token.class);
Credentials credentials = null;
Clock clock = new SystemClock();
int appAttemptId = 3;
MRAppMetrics metrics = mock(MRAppMetrics.class);
Resource minContainerRequirements = mock(Resource.class);
when(minContainerRequirements.getMemory()).thenReturn(1000);
ClusterInfo clusterInfo = mock(ClusterInfo.class);
when(clusterInfo.getMinContainerCapability()).thenReturn(
minContainerRequirements);
AppContext appContext = mock(AppContext.class);
when(appContext.getClusterInfo()).thenReturn(clusterInfo);
TaskSplitMetaInfo taskSplitMetaInfo = mock(TaskSplitMetaInfo.class);
MapTaskImpl mapTask = new MapTaskImpl(jobId, partitions,
eh, remoteJobConfFile, conf,
taskSplitMetaInfo, taskAttemptListener, jobToken, credentials, clock,
appAttemptId, metrics, appContext);
return mapTask;
}
private TaskAttemptInfo getMockTaskAttemptInfo(TaskAttemptID tai,
TaskAttemptState tas) {
ContainerId ci = mock(ContainerId.class);
Counters counters = mock(Counters.class);
TaskType tt = TaskType.MAP;
long finishTime = System.currentTimeMillis();
TaskAttemptInfo mockTAinfo = mock(TaskAttemptInfo.class);
when(mockTAinfo.getAttemptId()).thenReturn(tai);
when(mockTAinfo.getContainerId()).thenReturn(ci);
when(mockTAinfo.getCounters()).thenReturn(counters);
when(mockTAinfo.getError()).thenReturn("");
when(mockTAinfo.getFinishTime()).thenReturn(finishTime);
when(mockTAinfo.getHostname()).thenReturn("localhost");
when(mockTAinfo.getHttpPort()).thenReturn(23);
when(mockTAinfo.getMapFinishTime()).thenReturn(finishTime - 1000L);
when(mockTAinfo.getPort()).thenReturn(24);
when(mockTAinfo.getRackname()).thenReturn("defaultRack");
when(mockTAinfo.getShuffleFinishTime()).thenReturn(finishTime - 2000L);
when(mockTAinfo.getShufflePort()).thenReturn(25);
when(mockTAinfo.getSortFinishTime()).thenReturn(finishTime - 3000L);
when(mockTAinfo.getStartTime()).thenReturn(finishTime -10000);
when(mockTAinfo.getState()).thenReturn("task in progress");
when(mockTAinfo.getTaskStatus()).thenReturn(tas.toString());
when(mockTAinfo.getTaskType()).thenReturn(tt);
when(mockTAinfo.getTrackerName()).thenReturn("TrackerName");
return mockTAinfo;
}
private void writeBadOutput(TaskAttempt attempt, Configuration conf)
throws Exception {
TaskAttemptContext tContext = new TaskAttemptContextImpl(conf,
@ -1145,5 +1604,16 @@ public class TestRecovery {
public static void main(String[] arg) throws Exception {
TestRecovery test = new TestRecovery();
test.testCrashed();
test.testMultipleCrashes();
test.testOutputRecovery();
test.testOutputRecoveryMapsOnly();
test.testRecoveryWithOldCommiter();
test.testSpeculative();
test.testRecoveryWithoutShuffleSecret();
test.testRecoverySuccessAttempt();
test.testRecoveryAllFailAttempts();
test.testRecoveryTaskSuccessAllAttemptsFail();
test.testRecoveryTaskSuccessAllAttemptsSucceed();
test.testRecoveryAllAttemptsKilled();
}
}

View File

@ -316,7 +316,8 @@ import org.junit.Test;
Job newJob = new TestJob(getJobId(), getAttemptID(), conf,
getDispatcher().getEventHandler(),
getTaskAttemptListener(), getContext().getClock(),
isNewApiCommitter(), currentUser.getUserName(), getContext(),
getCommitter(), isNewApiCommitter(),
currentUser.getUserName(), getContext(),
forcedState, diagnostic);
((AppContext) getContext()).getAllJobs().put(newJob.getID(), newJob);

View File

@ -25,6 +25,7 @@ import static org.mockito.Mockito.when;
import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.EnumSet;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
@ -35,6 +36,7 @@ import org.apache.hadoop.mapreduce.JobACL;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.jobhistory.EventType;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
import org.apache.hadoop.mapreduce.jobhistory.JobSubmittedEvent;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.JobStatus.State;
@ -47,6 +49,7 @@ import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
@ -57,6 +60,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.JobDiagnosticsUpdateEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobFinishEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobStartEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl.InitTransition;
@ -69,7 +73,6 @@ import org.apache.hadoop.yarn.SystemClock;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.state.StateMachine;
import org.apache.hadoop.yarn.state.StateMachineFactory;
@ -133,7 +136,7 @@ public class TestJobImpl {
JobImpl job = createStubbedJob(conf, dispatcher, 0);
job.handle(new JobEvent(job.getID(), JobEventType.JOB_INIT));
assertJobState(job, JobStateInternal.INITED);
job.handle(new JobEvent(job.getID(), JobEventType.JOB_START));
job.handle(new JobStartEvent(job.getID()));
assertJobState(job, JobStateInternal.SUCCEEDED);
dispatcher.stop();
commitHandler.stop();
@ -222,7 +225,7 @@ public class TestJobImpl {
JobId jobId = job.getID();
job.handle(new JobEvent(jobId, JobEventType.JOB_INIT));
assertJobState(job, JobStateInternal.INITED);
job.handle(new JobEvent(jobId, JobEventType.JOB_START));
job.handle(new JobStartEvent(jobId));
assertJobState(job, JobStateInternal.SETUP);
job.handle(new JobEvent(job.getID(), JobEventType.JOB_AM_REBOOT));
@ -284,7 +287,7 @@ public class TestJobImpl {
JobId jobId = job.getID();
job.handle(new JobEvent(jobId, JobEventType.JOB_INIT));
assertJobState(job, JobStateInternal.INITED);
job.handle(new JobEvent(jobId, JobEventType.JOB_START));
job.handle(new JobStartEvent(jobId));
assertJobState(job, JobStateInternal.SETUP);
job.handle(new JobEvent(job.getID(), JobEventType.JOB_KILL));
@ -351,7 +354,7 @@ public class TestJobImpl {
JobId jobId = job.getID();
job.handle(new JobEvent(jobId, JobEventType.JOB_INIT));
assertJobState(job, JobStateInternal.INITED);
job.handle(new JobEvent(jobId, JobEventType.JOB_START));
job.handle(new JobStartEvent(jobId));
assertJobState(job, JobStateInternal.FAIL_ABORT);
job.handle(new JobEvent(jobId, JobEventType.JOB_KILL));
@ -388,7 +391,7 @@ public class TestJobImpl {
JobId jobId = job.getID();
job.handle(new JobEvent(jobId, JobEventType.JOB_INIT));
assertJobState(job, JobStateInternal.INITED);
job.handle(new JobEvent(jobId, JobEventType.JOB_START));
job.handle(new JobStartEvent(jobId));
assertJobState(job, JobStateInternal.SETUP);
job.handle(new JobEvent(jobId, JobEventType.JOB_KILL));
@ -428,7 +431,7 @@ public class TestJobImpl {
// Verify access
JobImpl job1 = new JobImpl(jobId, null, conf1, null, null, null, null, null,
null, null, true, null, 0, null, null, null, null);
null, null, null, true, null, 0, null, null, null, null);
Assert.assertTrue(job1.checkAccess(ugi1, JobACL.VIEW_JOB));
Assert.assertFalse(job1.checkAccess(ugi2, JobACL.VIEW_JOB));
@ -439,7 +442,7 @@ public class TestJobImpl {
// Verify access
JobImpl job2 = new JobImpl(jobId, null, conf2, null, null, null, null, null,
null, null, true, null, 0, null, null, null, null);
null, null, null, true, null, 0, null, null, null, null);
Assert.assertTrue(job2.checkAccess(ugi1, JobACL.VIEW_JOB));
Assert.assertTrue(job2.checkAccess(ugi2, JobACL.VIEW_JOB));
@ -450,7 +453,7 @@ public class TestJobImpl {
// Verify access
JobImpl job3 = new JobImpl(jobId, null, conf3, null, null, null, null, null,
null, null, true, null, 0, null, null, null, null);
null, null, null, true, null, 0, null, null, null, null);
Assert.assertTrue(job3.checkAccess(ugi1, JobACL.VIEW_JOB));
Assert.assertTrue(job3.checkAccess(ugi2, JobACL.VIEW_JOB));
@ -461,7 +464,7 @@ public class TestJobImpl {
// Verify access
JobImpl job4 = new JobImpl(jobId, null, conf4, null, null, null, null, null,
null, null, true, null, 0, null, null, null, null);
null, null, null, true, null, 0, null, null, null, null);
Assert.assertTrue(job4.checkAccess(ugi1, JobACL.VIEW_JOB));
Assert.assertTrue(job4.checkAccess(ugi2, JobACL.VIEW_JOB));
@ -472,7 +475,7 @@ public class TestJobImpl {
// Verify access
JobImpl job5 = new JobImpl(jobId, null, conf5, null, null, null, null, null,
null, null, true, null, 0, null, null, null, null);
null, null, null, true, null, 0, null, null, null, null);
Assert.assertTrue(job5.checkAccess(ugi1, null));
Assert.assertTrue(job5.checkAccess(ugi2, null));
}
@ -490,7 +493,7 @@ public class TestJobImpl {
mock(EventHandler.class),
null, mock(JobTokenSecretManager.class), null,
new SystemClock(), null,
mrAppMetrics, true, null, 0, null, null, null, null);
mrAppMetrics, null, true, null, 0, null, null, null, null);
job.handle(diagUpdateEvent);
String diagnostics = job.getReport().getDiagnostics();
Assert.assertNotNull(diagnostics);
@ -501,7 +504,7 @@ public class TestJobImpl {
mock(EventHandler.class),
null, mock(JobTokenSecretManager.class), null,
new SystemClock(), null,
mrAppMetrics, true, null, 0, null, null, null, null);
mrAppMetrics, null, true, null, 0, null, null, null, null);
job.handle(new JobEvent(jobId, JobEventType.JOB_KILL));
job.handle(diagUpdateEvent);
diagnostics = job.getReport().getDiagnostics();
@ -556,7 +559,7 @@ public class TestJobImpl {
JobImpl job = new JobImpl(jobId, Records
.newRecord(ApplicationAttemptId.class), conf, mock(EventHandler.class),
null, new JobTokenSecretManager(), new Credentials(), null, null,
mrAppMetrics, true, null, 0, null, null, null, null);
mrAppMetrics, null, true, null, 0, null, null, null, null);
InitTransition initTransition = getInitTransition(2);
JobEvent mockJobEvent = mock(JobEvent.class);
initTransition.transition(job, mockJobEvent);
@ -597,7 +600,7 @@ public class TestJobImpl {
JobId jobId = job.getID();
job.handle(new JobEvent(jobId, JobEventType.JOB_INIT));
assertJobState(job, JobStateInternal.INITED);
job.handle(new JobEvent(jobId, JobEventType.JOB_START));
job.handle(new JobStartEvent(jobId));
assertJobState(job, JobStateInternal.FAILED);
job.handle(new JobEvent(jobId, JobEventType.JOB_TASK_COMPLETED));
@ -661,7 +664,7 @@ public class TestJobImpl {
StubbedJob job = createStubbedJob(conf, dispatcher, numSplits);
job.handle(new JobEvent(job.getID(), JobEventType.JOB_INIT));
assertJobState(job, JobStateInternal.INITED);
job.handle(new JobEvent(job.getID(), JobEventType.JOB_START));
job.handle(new JobStartEvent(job.getID()));
assertJobState(job, JobStateInternal.RUNNING);
return job;
}
@ -785,9 +788,9 @@ public class TestJobImpl {
boolean newApiCommitter, String user, int numSplits) {
super(jobId, applicationAttemptId, conf, eventHandler,
null, new JobTokenSecretManager(), new Credentials(),
new SystemClock(), null, MRAppMetrics.create(),
newApiCommitter, user, System.currentTimeMillis(), null, null, null,
null);
new SystemClock(), Collections.<TaskId, TaskInfo> emptyMap(),
MRAppMetrics.create(), null, newApiCommitter, user,
System.currentTimeMillis(), null, null, null, null);
initTransition = getInitTransition(numSplits);
localFactory = stateMachineFactory.addTransition(JobStateInternal.NEW,

View File

@ -27,7 +27,6 @@ import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -38,7 +37,6 @@ import org.apache.hadoop.mapred.TaskUmbilicalProtocol;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.TaskCounter;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
import org.apache.hadoop.mapreduce.v2.api.records.Avataar;
@ -80,7 +78,6 @@ public class TestTaskImpl {
private Path remoteJobConfFile;
private Credentials credentials;
private Clock clock;
private Map<TaskId, TaskInfo> completedTasksFromPreviousRun;
private MRAppMetrics metrics;
private TaskImpl mockTask;
private ApplicationId appId;
@ -104,13 +101,12 @@ public class TestTaskImpl {
EventHandler eventHandler, Path remoteJobConfFile, JobConf conf,
TaskAttemptListener taskAttemptListener,
Token<JobTokenIdentifier> jobToken,
Credentials credentials, Clock clock,
Map<TaskId, TaskInfo> completedTasksFromPreviousRun, int startCount,
Credentials credentials, Clock clock, int startCount,
MRAppMetrics metrics, AppContext appContext, TaskType taskType) {
super(jobId, taskType , partition, eventHandler,
remoteJobConfFile, conf, taskAttemptListener,
jobToken, credentials, clock,
completedTasksFromPreviousRun, startCount, metrics, appContext);
startCount, metrics, appContext);
this.taskType = taskType;
}
@ -247,8 +243,7 @@ public class TestTaskImpl {
return new MockTaskImpl(jobId, partition, dispatcher.getEventHandler(),
remoteJobConfFile, conf, taskAttemptListener, jobToken,
credentials, clock,
completedTasksFromPreviousRun, startCount,
metrics, appContext, taskType);
startCount, metrics, appContext, taskType);
}
@After
@ -652,9 +647,7 @@ public class TestTaskImpl {
public void testFailedTransitions() {
mockTask = new MockTaskImpl(jobId, partition, dispatcher.getEventHandler(),
remoteJobConfFile, conf, taskAttemptListener, jobToken,
credentials, clock,
completedTasksFromPreviousRun, startCount,
metrics, appContext, TaskType.MAP) {
credentials, clock, startCount, metrics, appContext, TaskType.MAP) {
@Override
protected int getMaxAttempts() {
return 1;
@ -721,9 +714,7 @@ public class TestTaskImpl {
public void testCountersWithSpeculation() {
mockTask = new MockTaskImpl(jobId, partition, dispatcher.getEventHandler(),
remoteJobConfFile, conf, taskAttemptListener, jobToken,
credentials, clock,
completedTasksFromPreviousRun, startCount,
metrics, appContext, TaskType.MAP) {
credentials, clock, startCount, metrics, appContext, TaskType.MAP) {
@Override
protected int getMaxAttempts() {
return 1;

View File

@ -47,6 +47,7 @@ import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import com.google.common.base.Joiner;
@ -525,4 +526,19 @@ public class JobHistoryUtils {
sb.append(jobId.toString());
return sb.toString();
}
public static Path getPreviousJobHistoryPath(
Configuration conf, ApplicationAttemptId applicationAttemptId)
throws IOException {
String jobId =
TypeConverter.fromYarn(applicationAttemptId.getApplicationId())
.toString();
String jobhistoryDir =
JobHistoryUtils.getConfiguredHistoryStagingDirPrefix(conf, jobId);
Path histDirPath = FileContext.getFileContext(conf).makeQualified(
new Path(jobhistoryDir));
FileContext fc = FileContext.getFileContext(histDirPath.toUri(), conf);
return fc.makeQualified(JobHistoryUtils.getStagingJobHistoryFile(
histDirPath,jobId, (applicationAttemptId.getAttemptId() - 1)));
}
}

View File

@ -422,6 +422,7 @@ public interface MRJobConfig {
/** Enable job recovery.*/
public static final String MR_AM_JOB_RECOVERY_ENABLE =
MR_AM_PREFIX + "job.recovery.enable";
public static final boolean MR_AM_JOB_RECOVERY_ENABLE_DEFAULT = true;
/**
* Limit on the number of reducers that can be preempted to ensure that at