MAPREDUCE-5079. Changes job recovery to restore state directly from job history, instaed of simulating state machine events. Contributed by Jason Lowe and Robert Parker.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1466767 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
3ddf8319ca
commit
6a1c41111e
|
@ -169,6 +169,10 @@ Release 2.0.5-alpha - UNRELEASED
|
|||
MAPREDUCE-5129. Allow tags to JobHistory for deeper analytics. (billie via
|
||||
acmurthy)
|
||||
|
||||
MAPREDUCE-5079. Changes job recovery to restore state directly from job
|
||||
history, instaed of simulating state machine events.
|
||||
(Jason Lowe and Robert Parker via sseth)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
BUG FIXES
|
||||
|
|
|
@ -24,9 +24,12 @@ import java.lang.reflect.Constructor;
|
|||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
|
@ -46,6 +49,7 @@ import org.apache.hadoop.mapreduce.MRJobConfig;
|
|||
import org.apache.hadoop.mapreduce.OutputCommitter;
|
||||
import org.apache.hadoop.mapreduce.OutputFormat;
|
||||
import org.apache.hadoop.mapreduce.TaskAttemptContext;
|
||||
import org.apache.hadoop.mapreduce.TaskAttemptID;
|
||||
import org.apache.hadoop.mapreduce.TypeConverter;
|
||||
import org.apache.hadoop.mapreduce.jobhistory.AMStartedEvent;
|
||||
import org.apache.hadoop.mapreduce.jobhistory.EventReader;
|
||||
|
@ -54,6 +58,9 @@ import org.apache.hadoop.mapreduce.jobhistory.HistoryEvent;
|
|||
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryCopyService;
|
||||
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
|
||||
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler;
|
||||
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser;
|
||||
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
|
||||
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo;
|
||||
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
|
||||
import org.apache.hadoop.mapreduce.security.TokenCache;
|
||||
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
|
||||
|
@ -61,6 +68,7 @@ import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
|
|||
import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
|
||||
import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
|
||||
import org.apache.hadoop.mapreduce.v2.app.client.MRClientService;
|
||||
|
@ -74,6 +82,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
|
|||
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.event.JobFinishEvent;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.event.JobStartEvent;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
|
||||
|
@ -84,8 +93,6 @@ import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent;
|
|||
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherImpl;
|
||||
import org.apache.hadoop.mapreduce.v2.app.local.LocalContainerAllocator;
|
||||
import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
|
||||
import org.apache.hadoop.mapreduce.v2.app.recover.Recovery;
|
||||
import org.apache.hadoop.mapreduce.v2.app.recover.RecoveryService;
|
||||
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
|
||||
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
|
||||
import org.apache.hadoop.mapreduce.v2.app.rm.RMCommunicator;
|
||||
|
@ -94,6 +101,7 @@ import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
|
|||
import org.apache.hadoop.mapreduce.v2.app.speculate.DefaultSpeculator;
|
||||
import org.apache.hadoop.mapreduce.v2.app.speculate.Speculator;
|
||||
import org.apache.hadoop.mapreduce.v2.app.speculate.SpeculatorEvent;
|
||||
import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
|
||||
import org.apache.hadoop.mapreduce.v2.util.MRApps;
|
||||
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
|
||||
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
||||
|
@ -167,7 +175,6 @@ public class MRAppMaster extends CompositeService {
|
|||
private AppContext context;
|
||||
private Dispatcher dispatcher;
|
||||
private ClientService clientService;
|
||||
private Recovery recoveryServ;
|
||||
private ContainerAllocator containerAllocator;
|
||||
private ContainerLauncher containerLauncher;
|
||||
private EventHandler<CommitterEvent> committerEventHandler;
|
||||
|
@ -180,7 +187,6 @@ public class MRAppMaster extends CompositeService {
|
|||
private OutputCommitter committer;
|
||||
private JobEventDispatcher jobEventDispatcher;
|
||||
private JobHistoryEventHandler jobHistoryEventHandler;
|
||||
private boolean inRecovery = false;
|
||||
private SpeculatorEventDispatcher speculatorEventDispatcher;
|
||||
|
||||
private Job job;
|
||||
|
@ -193,6 +199,8 @@ public class MRAppMaster extends CompositeService {
|
|||
private String shutDownMessage = null;
|
||||
JobStateInternal forcedState = null;
|
||||
|
||||
private long recoveredJobStartTime = 0;
|
||||
|
||||
public MRAppMaster(ApplicationAttemptId applicationAttemptId,
|
||||
ContainerId containerId, String nmHost, int nmPort, int nmHttpPort,
|
||||
long appSubmitTime, int maxAppAttempts) {
|
||||
|
@ -340,34 +348,9 @@ public class MRAppMaster extends CompositeService {
|
|||
}
|
||||
} else {
|
||||
committer = createOutputCommitter(conf);
|
||||
boolean recoveryEnabled = conf.getBoolean(
|
||||
MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
|
||||
boolean recoverySupportedByCommitter = committer.isRecoverySupported();
|
||||
|
||||
// If a shuffle secret was not provided by the job client then this app
|
||||
// attempt will generate one. However that disables recovery if there
|
||||
// are reducers as the shuffle secret would be app attempt specific.
|
||||
boolean shuffleKeyValidForRecovery = (numReduceTasks > 0 &&
|
||||
TokenCache.getShuffleSecretKey(fsTokens) != null);
|
||||
|
||||
if (recoveryEnabled && recoverySupportedByCommitter
|
||||
&& shuffleKeyValidForRecovery && appAttemptID.getAttemptId() > 1) {
|
||||
LOG.info("Recovery is enabled. "
|
||||
+ "Will try to recover from previous life on best effort basis.");
|
||||
recoveryServ = createRecoveryService(context);
|
||||
addIfService(recoveryServ);
|
||||
dispatcher = recoveryServ.getDispatcher();
|
||||
clock = recoveryServ.getClock();
|
||||
inRecovery = true;
|
||||
} else {
|
||||
LOG.info("Not starting RecoveryService: recoveryEnabled: "
|
||||
+ recoveryEnabled + " recoverySupportedByCommitter: "
|
||||
+ recoverySupportedByCommitter + " shuffleKeyValidForRecovery: "
|
||||
+ shuffleKeyValidForRecovery + " ApplicationAttemptID: "
|
||||
+ appAttemptID.getAttemptId());
|
||||
dispatcher = createDispatcher();
|
||||
addIfService(dispatcher);
|
||||
}
|
||||
dispatcher = createDispatcher();
|
||||
addIfService(dispatcher);
|
||||
|
||||
//service to handle requests from JobClient
|
||||
clientService = createClientService(context);
|
||||
|
@ -595,15 +578,6 @@ public class MRAppMaster extends CompositeService {
|
|||
return new JobFinishEventHandler();
|
||||
}
|
||||
|
||||
/**
|
||||
* Create the recovery service.
|
||||
* @return an instance of the recovery service.
|
||||
*/
|
||||
protected Recovery createRecoveryService(AppContext appContext) {
|
||||
return new RecoveryService(appContext.getApplicationAttemptId(),
|
||||
appContext.getClock(), getCommitter(), isNewApiCommitter());
|
||||
}
|
||||
|
||||
/** Create and initialize (but don't start) a single job.
|
||||
* @param forcedState a state to force the job into or null for normal operation.
|
||||
* @param diagnostic a diagnostic message to include with the job.
|
||||
|
@ -615,7 +589,8 @@ public class MRAppMaster extends CompositeService {
|
|||
Job newJob =
|
||||
new JobImpl(jobId, appAttemptID, conf, dispatcher.getEventHandler(),
|
||||
taskAttemptListener, jobTokenSecretManager, fsTokens, clock,
|
||||
completedTasksFromPreviousRun, metrics, newApiCommitter,
|
||||
completedTasksFromPreviousRun, metrics,
|
||||
committer, newApiCommitter,
|
||||
currentUser.getUserName(), appSubmitTime, amInfos, context,
|
||||
forcedState, diagnostic);
|
||||
((RunningAppContext) context).jobs.put(newJob.getID(), newJob);
|
||||
|
@ -978,18 +953,8 @@ public class MRAppMaster extends CompositeService {
|
|||
public void start() {
|
||||
|
||||
amInfos = new LinkedList<AMInfo>();
|
||||
|
||||
// Pull completedTasks etc from recovery
|
||||
if (inRecovery) {
|
||||
completedTasksFromPreviousRun = recoveryServ.getCompletedTasks();
|
||||
amInfos = recoveryServ.getAMInfos();
|
||||
} else {
|
||||
// Get the amInfos anyways irrespective of whether recovery is enabled or
|
||||
// not IF this is not the first AM generation
|
||||
if (appAttemptID.getAttemptId() != 1) {
|
||||
amInfos.addAll(readJustAMInfos());
|
||||
}
|
||||
}
|
||||
completedTasksFromPreviousRun = new HashMap<TaskId, TaskInfo>();
|
||||
processRecovery();
|
||||
|
||||
// Current an AMInfo for the current AM generation.
|
||||
AMInfo amInfo =
|
||||
|
@ -1051,13 +1016,105 @@ public class MRAppMaster extends CompositeService {
|
|||
startJobs();
|
||||
}
|
||||
|
||||
private void processRecovery() {
|
||||
if (appAttemptID.getAttemptId() == 1) {
|
||||
return; // no need to recover on the first attempt
|
||||
}
|
||||
|
||||
boolean recoveryEnabled = getConfig().getBoolean(
|
||||
MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE,
|
||||
MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE_DEFAULT);
|
||||
boolean recoverySupportedByCommitter =
|
||||
committer != null && committer.isRecoverySupported();
|
||||
|
||||
// If a shuffle secret was not provided by the job client then this app
|
||||
// attempt will generate one. However that disables recovery if there
|
||||
// are reducers as the shuffle secret would be app attempt specific.
|
||||
int numReduceTasks = getConfig().getInt(MRJobConfig.NUM_REDUCES, 0);
|
||||
boolean shuffleKeyValidForRecovery = (numReduceTasks > 0 &&
|
||||
TokenCache.getShuffleSecretKey(fsTokens) != null);
|
||||
|
||||
if (recoveryEnabled && recoverySupportedByCommitter
|
||||
&& shuffleKeyValidForRecovery) {
|
||||
LOG.info("Recovery is enabled. "
|
||||
+ "Will try to recover from previous life on best effort basis.");
|
||||
try {
|
||||
parsePreviousJobHistory();
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Unable to parse prior job history, aborting recovery", e);
|
||||
// try to get just the AMInfos
|
||||
amInfos.addAll(readJustAMInfos());
|
||||
}
|
||||
} else {
|
||||
LOG.info("Will not try to recover. recoveryEnabled: "
|
||||
+ recoveryEnabled + " recoverySupportedByCommitter: "
|
||||
+ recoverySupportedByCommitter + " shuffleKeyValidForRecovery: "
|
||||
+ shuffleKeyValidForRecovery + " ApplicationAttemptID: "
|
||||
+ appAttemptID.getAttemptId());
|
||||
// Get the amInfos anyways whether recovery is enabled or not
|
||||
amInfos.addAll(readJustAMInfos());
|
||||
}
|
||||
}
|
||||
|
||||
private static FSDataInputStream getPreviousJobHistoryStream(
|
||||
Configuration conf, ApplicationAttemptId appAttemptId)
|
||||
throws IOException {
|
||||
Path historyFile = JobHistoryUtils.getPreviousJobHistoryPath(conf,
|
||||
appAttemptId);
|
||||
LOG.info("Previous history file is at " + historyFile);
|
||||
return historyFile.getFileSystem(conf).open(historyFile);
|
||||
}
|
||||
|
||||
private void parsePreviousJobHistory() throws IOException {
|
||||
FSDataInputStream in = getPreviousJobHistoryStream(getConfig(),
|
||||
appAttemptID);
|
||||
JobHistoryParser parser = new JobHistoryParser(in);
|
||||
JobInfo jobInfo = parser.parse();
|
||||
Exception parseException = parser.getParseException();
|
||||
if (parseException != null) {
|
||||
LOG.info("Got an error parsing job-history file" +
|
||||
", ignoring incomplete events.", parseException);
|
||||
}
|
||||
Map<org.apache.hadoop.mapreduce.TaskID, TaskInfo> taskInfos = jobInfo
|
||||
.getAllTasks();
|
||||
for (TaskInfo taskInfo : taskInfos.values()) {
|
||||
if (TaskState.SUCCEEDED.toString().equals(taskInfo.getTaskStatus())) {
|
||||
Iterator<Entry<TaskAttemptID, TaskAttemptInfo>> taskAttemptIterator =
|
||||
taskInfo.getAllTaskAttempts().entrySet().iterator();
|
||||
while (taskAttemptIterator.hasNext()) {
|
||||
Map.Entry<TaskAttemptID, TaskAttemptInfo> currentEntry = taskAttemptIterator.next();
|
||||
if (!jobInfo.getAllCompletedTaskAttempts().containsKey(currentEntry.getKey())) {
|
||||
taskAttemptIterator.remove();
|
||||
}
|
||||
}
|
||||
completedTasksFromPreviousRun
|
||||
.put(TypeConverter.toYarn(taskInfo.getTaskId()), taskInfo);
|
||||
LOG.info("Read from history task "
|
||||
+ TypeConverter.toYarn(taskInfo.getTaskId()));
|
||||
}
|
||||
}
|
||||
LOG.info("Read completed tasks from history "
|
||||
+ completedTasksFromPreviousRun.size());
|
||||
recoveredJobStartTime = jobInfo.getLaunchTime();
|
||||
|
||||
// recover AMInfos
|
||||
List<JobHistoryParser.AMInfo> jhAmInfoList = jobInfo.getAMInfos();
|
||||
if (jhAmInfoList != null) {
|
||||
for (JobHistoryParser.AMInfo jhAmInfo : jhAmInfoList) {
|
||||
AMInfo amInfo = MRBuilderUtils.newAMInfo(jhAmInfo.getAppAttemptId(),
|
||||
jhAmInfo.getStartTime(), jhAmInfo.getContainerId(),
|
||||
jhAmInfo.getNodeManagerHost(), jhAmInfo.getNodeManagerPort(),
|
||||
jhAmInfo.getNodeManagerHttpPort());
|
||||
amInfos.add(amInfo);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private List<AMInfo> readJustAMInfos() {
|
||||
List<AMInfo> amInfos = new ArrayList<AMInfo>();
|
||||
FSDataInputStream inputStream = null;
|
||||
try {
|
||||
inputStream =
|
||||
RecoveryService.getPreviousJobHistoryFileStream(getConfig(),
|
||||
appAttemptID);
|
||||
inputStream = getPreviousJobHistoryStream(getConfig(), appAttemptID);
|
||||
EventReader jobHistoryEventReader = new EventReader(inputStream);
|
||||
|
||||
// All AMInfos are contiguous. Track when the first AMStartedEvent
|
||||
|
@ -1108,7 +1165,8 @@ public class MRAppMaster extends CompositeService {
|
|||
@SuppressWarnings("unchecked")
|
||||
protected void startJobs() {
|
||||
/** create a job-start event to get this ball rolling */
|
||||
JobEvent startJobEvent = new JobEvent(job.getID(), JobEventType.JOB_START);
|
||||
JobEvent startJobEvent = new JobStartEvent(job.getID(),
|
||||
recoveredJobStartTime);
|
||||
/** send the job-start event. this triggers the job execution. */
|
||||
dispatcher.getEventHandler().handle(startJobEvent);
|
||||
}
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
/*
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
|
@ -15,6 +15,25 @@
|
|||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
package org.apache.hadoop.mapreduce.v2.app.recover;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
|
||||
package org.apache.hadoop.mapreduce.v2.app.job.event;
|
||||
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
|
||||
|
||||
public class JobStartEvent extends JobEvent {
|
||||
|
||||
long recoveredJobStartTime;
|
||||
|
||||
public JobStartEvent(JobId jobID) {
|
||||
this(jobID, 0);
|
||||
}
|
||||
|
||||
public JobStartEvent(JobId jobID, long recoveredJobStartTime) {
|
||||
super(jobID, JobEventType.JOB_START);
|
||||
this.recoveredJobStartTime = recoveredJobStartTime;
|
||||
}
|
||||
|
||||
public long getRecoveredJobStartTime() {
|
||||
return recoveredJobStartTime;
|
||||
}
|
||||
}
|
|
@ -26,6 +26,7 @@ public enum TaskAttemptEventType {
|
|||
//Producer:Task
|
||||
TA_SCHEDULE,
|
||||
TA_RESCHEDULE,
|
||||
TA_RECOVER,
|
||||
|
||||
//Producer:Client, Task
|
||||
TA_KILL,
|
||||
|
|
|
@ -0,0 +1,50 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.mapreduce.v2.app.job.event;
|
||||
|
||||
import org.apache.hadoop.mapreduce.OutputCommitter;
|
||||
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
|
||||
|
||||
public class TaskAttemptRecoverEvent extends TaskAttemptEvent {
|
||||
|
||||
private TaskAttemptInfo taInfo;
|
||||
private OutputCommitter committer;
|
||||
private boolean recoverAttemptOutput;
|
||||
|
||||
public TaskAttemptRecoverEvent(TaskAttemptId id, TaskAttemptInfo taInfo,
|
||||
OutputCommitter committer, boolean recoverOutput) {
|
||||
super(id, TaskAttemptEventType.TA_RECOVER);
|
||||
this.taInfo = taInfo;
|
||||
this.committer = committer;
|
||||
this.recoverAttemptOutput = recoverOutput;
|
||||
}
|
||||
|
||||
public TaskAttemptInfo getTaskAttemptInfo() {
|
||||
return taInfo;
|
||||
}
|
||||
|
||||
public OutputCommitter getCommitter() {
|
||||
return committer;
|
||||
}
|
||||
|
||||
public boolean getRecoverOutput() {
|
||||
return recoverAttemptOutput;
|
||||
}
|
||||
}
|
|
@ -28,6 +28,7 @@ public enum TaskEventType {
|
|||
|
||||
//Producer:Job
|
||||
T_SCHEDULE,
|
||||
T_RECOVER,
|
||||
|
||||
//Producer:Speculator
|
||||
T_ADD_SPEC_ATTEMPT,
|
||||
|
|
|
@ -0,0 +1,50 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.mapreduce.v2.app.job.event;
|
||||
|
||||
import org.apache.hadoop.mapreduce.OutputCommitter;
|
||||
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
|
||||
|
||||
public class TaskRecoverEvent extends TaskEvent {
|
||||
|
||||
private TaskInfo taskInfo;
|
||||
private OutputCommitter committer;
|
||||
private boolean recoverTaskOutput;
|
||||
|
||||
public TaskRecoverEvent(TaskId taskID, TaskInfo taskInfo,
|
||||
OutputCommitter committer, boolean recoverTaskOutput) {
|
||||
super(taskID, TaskEventType.T_RECOVER);
|
||||
this.taskInfo = taskInfo;
|
||||
this.committer = committer;
|
||||
this.recoverTaskOutput = recoverTaskOutput;
|
||||
}
|
||||
|
||||
public TaskInfo getTaskInfo() {
|
||||
return taskInfo;
|
||||
}
|
||||
|
||||
public OutputCommitter getOutputCommitter() {
|
||||
return committer;
|
||||
}
|
||||
|
||||
public boolean getRecoverTaskOutput() {
|
||||
return recoverTaskOutput;
|
||||
}
|
||||
}
|
|
@ -49,6 +49,7 @@ import org.apache.hadoop.mapreduce.Counters;
|
|||
import org.apache.hadoop.mapreduce.JobACL;
|
||||
import org.apache.hadoop.mapreduce.JobContext;
|
||||
import org.apache.hadoop.mapreduce.MRJobConfig;
|
||||
import org.apache.hadoop.mapreduce.OutputCommitter;
|
||||
import org.apache.hadoop.mapreduce.TypeConverter;
|
||||
import org.apache.hadoop.mapreduce.jobhistory.JobFinishedEvent;
|
||||
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
|
||||
|
@ -92,6 +93,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
|
|||
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.event.JobFinishEvent;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.event.JobSetupFailedEvent;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.event.JobStartEvent;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskAttemptCompletedEvent;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskAttemptFetchFailureEvent;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskEvent;
|
||||
|
@ -101,6 +103,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
|
|||
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptKillEvent;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskRecoverEvent;
|
||||
import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
|
||||
import org.apache.hadoop.mapreduce.v2.util.MRApps;
|
||||
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
|
||||
|
@ -159,6 +162,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
|
|||
private final Lock writeLock;
|
||||
private final JobId jobId;
|
||||
private final String jobName;
|
||||
private final OutputCommitter committer;
|
||||
private final boolean newApiCommitter;
|
||||
private final org.apache.hadoop.mapreduce.JobID oldJobId;
|
||||
private final TaskAttemptListener taskAttemptListener;
|
||||
|
@ -602,7 +606,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
|
|||
JobTokenSecretManager jobTokenSecretManager,
|
||||
Credentials fsTokenCredentials, Clock clock,
|
||||
Map<TaskId, TaskInfo> completedTasksFromPreviousRun, MRAppMetrics metrics,
|
||||
boolean newApiCommitter, String userName,
|
||||
OutputCommitter committer, boolean newApiCommitter, String userName,
|
||||
long appSubmitTime, List<AMInfo> amInfos, AppContext appContext,
|
||||
JobStateInternal forcedState, String forcedDiagnostic) {
|
||||
this.applicationAttemptId = applicationAttemptId;
|
||||
|
@ -618,6 +622,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
|
|||
this.queueName = conf.get(MRJobConfig.QUEUE_NAME, "default");
|
||||
this.appSubmitTime = appSubmitTime;
|
||||
this.oldJobId = TypeConverter.fromYarn(jobId);
|
||||
this.committer = committer;
|
||||
this.newApiCommitter = newApiCommitter;
|
||||
|
||||
this.taskAttemptListener = taskAttemptListener;
|
||||
|
@ -888,10 +893,16 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
|
|||
}
|
||||
}
|
||||
|
||||
protected void scheduleTasks(Set<TaskId> taskIDs) {
|
||||
protected void scheduleTasks(Set<TaskId> taskIDs,
|
||||
boolean recoverTaskOutput) {
|
||||
for (TaskId taskID : taskIDs) {
|
||||
eventHandler.handle(new TaskEvent(taskID,
|
||||
TaskEventType.T_SCHEDULE));
|
||||
TaskInfo taskInfo = completedTasksFromPreviousRun.remove(taskID);
|
||||
if (taskInfo != null) {
|
||||
eventHandler.handle(new TaskRecoverEvent(taskID, taskInfo,
|
||||
committer, recoverTaskOutput));
|
||||
} else {
|
||||
eventHandler.handle(new TaskEvent(taskID, TaskEventType.T_SCHEDULE));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1421,7 +1432,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
|
|||
job.conf, splits[i],
|
||||
job.taskAttemptListener,
|
||||
job.jobToken, job.fsTokens,
|
||||
job.clock, job.completedTasksFromPreviousRun,
|
||||
job.clock,
|
||||
job.applicationAttemptId.getAttemptId(),
|
||||
job.metrics, job.appContext);
|
||||
job.addTask(task);
|
||||
|
@ -1439,7 +1450,6 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
|
|||
job.conf, job.numMapTasks,
|
||||
job.taskAttemptListener, job.jobToken,
|
||||
job.fsTokens, job.clock,
|
||||
job.completedTasksFromPreviousRun,
|
||||
job.applicationAttemptId.getAttemptId(),
|
||||
job.metrics, job.appContext);
|
||||
job.addTask(task);
|
||||
|
@ -1475,8 +1485,8 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
|
|||
@Override
|
||||
public void transition(JobImpl job, JobEvent event) {
|
||||
job.setupProgress = 1.0f;
|
||||
job.scheduleTasks(job.mapTasks); // schedule (i.e., start) the maps
|
||||
job.scheduleTasks(job.reduceTasks);
|
||||
job.scheduleTasks(job.mapTasks, job.numReduceTasks == 0);
|
||||
job.scheduleTasks(job.reduceTasks, true);
|
||||
|
||||
// If we have no tasks, just transition to job completed
|
||||
if (job.numReduceTasks == 0 && job.numMapTasks == 0) {
|
||||
|
@ -1507,7 +1517,12 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
|
|||
*/
|
||||
@Override
|
||||
public void transition(JobImpl job, JobEvent event) {
|
||||
job.startTime = job.clock.getTime();
|
||||
JobStartEvent jse = (JobStartEvent) event;
|
||||
if (jse.getRecoveredJobStartTime() != 0) {
|
||||
job.startTime = jse.getRecoveredJobStartTime();
|
||||
} else {
|
||||
job.startTime = job.clock.getTime();
|
||||
}
|
||||
JobInitedEvent jie =
|
||||
new JobInitedEvent(job.oldJobId,
|
||||
job.startTime,
|
||||
|
|
|
@ -18,17 +18,13 @@
|
|||
|
||||
package org.apache.hadoop.mapreduce.v2.app.job.impl;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.mapred.JobConf;
|
||||
import org.apache.hadoop.mapred.MapTaskAttemptImpl;
|
||||
import org.apache.hadoop.mapreduce.MRJobConfig;
|
||||
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
|
||||
import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
|
||||
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
|
||||
import org.apache.hadoop.mapreduce.v2.app.AppContext;
|
||||
import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
|
||||
|
@ -49,11 +45,10 @@ public class MapTaskImpl extends TaskImpl {
|
|||
TaskAttemptListener taskAttemptListener,
|
||||
Token<JobTokenIdentifier> jobToken,
|
||||
Credentials credentials, Clock clock,
|
||||
Map<TaskId, TaskInfo> completedTasksFromPreviousRun, int startCount,
|
||||
MRAppMetrics metrics, AppContext appContext) {
|
||||
int appAttemptId, MRAppMetrics metrics, AppContext appContext) {
|
||||
super(jobId, TaskType.MAP, partition, eventHandler, remoteJobConfFile,
|
||||
conf, taskAttemptListener, jobToken, credentials, clock,
|
||||
completedTasksFromPreviousRun, startCount, metrics, appContext);
|
||||
appAttemptId, metrics, appContext);
|
||||
this.taskSplitMetaInfo = taskSplitMetaInfo;
|
||||
}
|
||||
|
||||
|
|
|
@ -18,16 +18,12 @@
|
|||
|
||||
package org.apache.hadoop.mapreduce.v2.app.job.impl;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.mapred.JobConf;
|
||||
import org.apache.hadoop.mapred.ReduceTaskAttemptImpl;
|
||||
import org.apache.hadoop.mapreduce.MRJobConfig;
|
||||
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
|
||||
import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
|
||||
import org.apache.hadoop.mapreduce.v2.app.AppContext;
|
||||
import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
|
||||
|
@ -47,11 +43,10 @@ public class ReduceTaskImpl extends TaskImpl {
|
|||
int numMapTasks, TaskAttemptListener taskAttemptListener,
|
||||
Token<JobTokenIdentifier> jobToken,
|
||||
Credentials credentials, Clock clock,
|
||||
Map<TaskId, TaskInfo> completedTasksFromPreviousRun, int startCount,
|
||||
MRAppMetrics metrics, AppContext appContext) {
|
||||
int appAttemptId, MRAppMetrics metrics, AppContext appContext) {
|
||||
super(jobId, TaskType.REDUCE, partition, eventHandler, jobFile, conf,
|
||||
taskAttemptListener, jobToken, credentials, clock,
|
||||
completedTasksFromPreviousRun, startCount, metrics, appContext);
|
||||
appAttemptId, metrics, appContext);
|
||||
this.numMapTasks = numMapTasks;
|
||||
}
|
||||
|
||||
|
|
|
@ -56,10 +56,12 @@ import org.apache.hadoop.mapreduce.Counter;
|
|||
import org.apache.hadoop.mapreduce.Counters;
|
||||
import org.apache.hadoop.mapreduce.JobCounter;
|
||||
import org.apache.hadoop.mapreduce.MRJobConfig;
|
||||
import org.apache.hadoop.mapreduce.OutputCommitter;
|
||||
import org.apache.hadoop.mapreduce.TaskAttemptContext;
|
||||
import org.apache.hadoop.mapreduce.TaskCounter;
|
||||
import org.apache.hadoop.mapreduce.TypeConverter;
|
||||
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
|
||||
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo;
|
||||
import org.apache.hadoop.mapreduce.jobhistory.MapAttemptFinishedEvent;
|
||||
import org.apache.hadoop.mapreduce.jobhistory.ReduceAttemptFinishedEvent;
|
||||
import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptStartedEvent;
|
||||
|
@ -89,6 +91,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptDiagnosticsUpdate
|
|||
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptKillEvent;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptRecoverEvent;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
|
||||
|
@ -204,6 +207,11 @@ public abstract class TaskAttemptImpl implements
|
|||
TaskAttemptEventType.TA_KILL, new KilledTransition())
|
||||
.addTransition(TaskAttemptStateInternal.NEW, TaskAttemptStateInternal.FAILED,
|
||||
TaskAttemptEventType.TA_FAILMSG, new FailedTransition())
|
||||
.addTransition(TaskAttemptStateInternal.NEW,
|
||||
EnumSet.of(TaskAttemptStateInternal.FAILED,
|
||||
TaskAttemptStateInternal.KILLED,
|
||||
TaskAttemptStateInternal.SUCCEEDED),
|
||||
TaskAttemptEventType.TA_RECOVER, new RecoverTransition())
|
||||
.addTransition(TaskAttemptStateInternal.NEW,
|
||||
TaskAttemptStateInternal.NEW,
|
||||
TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
|
||||
|
@ -1082,6 +1090,102 @@ public abstract class TaskAttemptImpl implements
|
|||
this.avataar = avataar;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public TaskAttemptStateInternal recover(TaskAttemptInfo taInfo,
|
||||
OutputCommitter committer, boolean recoverOutput) {
|
||||
containerID = taInfo.getContainerId();
|
||||
containerNodeId = ConverterUtils.toNodeId(taInfo.getHostname() + ":"
|
||||
+ taInfo.getPort());
|
||||
containerMgrAddress = StringInterner.weakIntern(
|
||||
containerNodeId.toString());
|
||||
nodeHttpAddress = StringInterner.weakIntern(taInfo.getHostname() + ":"
|
||||
+ taInfo.getHttpPort());
|
||||
computeRackAndLocality();
|
||||
launchTime = taInfo.getStartTime();
|
||||
finishTime = (taInfo.getFinishTime() != -1) ?
|
||||
taInfo.getFinishTime() : clock.getTime();
|
||||
shufflePort = taInfo.getShufflePort();
|
||||
trackerName = taInfo.getHostname();
|
||||
httpPort = taInfo.getHttpPort();
|
||||
sendLaunchedEvents();
|
||||
|
||||
reportedStatus.id = attemptId;
|
||||
reportedStatus.progress = 1.0f;
|
||||
reportedStatus.counters = taInfo.getCounters();
|
||||
reportedStatus.stateString = taInfo.getState();
|
||||
reportedStatus.phase = Phase.CLEANUP;
|
||||
reportedStatus.mapFinishTime = taInfo.getMapFinishTime();
|
||||
reportedStatus.shuffleFinishTime = taInfo.getShuffleFinishTime();
|
||||
reportedStatus.sortFinishTime = taInfo.getSortFinishTime();
|
||||
addDiagnosticInfo(taInfo.getError());
|
||||
|
||||
boolean needToClean = false;
|
||||
String recoveredState = taInfo.getTaskStatus();
|
||||
if (recoverOutput
|
||||
&& TaskAttemptState.SUCCEEDED.toString().equals(recoveredState)) {
|
||||
TaskAttemptContext tac = new TaskAttemptContextImpl(conf,
|
||||
TypeConverter.fromYarn(attemptId));
|
||||
try {
|
||||
committer.recoverTask(tac);
|
||||
LOG.info("Recovered output from task attempt " + attemptId);
|
||||
} catch (Exception e) {
|
||||
LOG.error("Unable to recover task attempt " + attemptId, e);
|
||||
LOG.info("Task attempt " + attemptId + " will be recovered as KILLED");
|
||||
recoveredState = TaskAttemptState.KILLED.toString();
|
||||
needToClean = true;
|
||||
}
|
||||
}
|
||||
|
||||
TaskAttemptStateInternal attemptState;
|
||||
if (TaskAttemptState.SUCCEEDED.toString().equals(recoveredState)) {
|
||||
attemptState = TaskAttemptStateInternal.SUCCEEDED;
|
||||
reportedStatus.taskState = TaskAttemptState.SUCCEEDED;
|
||||
eventHandler.handle(createJobCounterUpdateEventTASucceeded(this));
|
||||
logAttemptFinishedEvent(attemptState);
|
||||
} else if (TaskAttemptState.FAILED.toString().equals(recoveredState)) {
|
||||
attemptState = TaskAttemptStateInternal.FAILED;
|
||||
reportedStatus.taskState = TaskAttemptState.FAILED;
|
||||
eventHandler.handle(createJobCounterUpdateEventTAFailed(this, false));
|
||||
TaskAttemptUnsuccessfulCompletionEvent tauce =
|
||||
createTaskAttemptUnsuccessfulCompletionEvent(this,
|
||||
TaskAttemptStateInternal.FAILED);
|
||||
eventHandler.handle(
|
||||
new JobHistoryEvent(attemptId.getTaskId().getJobId(), tauce));
|
||||
} else {
|
||||
if (!TaskAttemptState.KILLED.toString().equals(recoveredState)) {
|
||||
if (String.valueOf(recoveredState).isEmpty()) {
|
||||
LOG.info("TaskAttempt" + attemptId
|
||||
+ " had not completed, recovering as KILLED");
|
||||
} else {
|
||||
LOG.warn("TaskAttempt " + attemptId + " found in unexpected state "
|
||||
+ recoveredState + ", recovering as KILLED");
|
||||
}
|
||||
addDiagnosticInfo("Killed during application recovery");
|
||||
needToClean = true;
|
||||
}
|
||||
attemptState = TaskAttemptStateInternal.KILLED;
|
||||
reportedStatus.taskState = TaskAttemptState.KILLED;
|
||||
eventHandler.handle(createJobCounterUpdateEventTAKilled(this, false));
|
||||
TaskAttemptUnsuccessfulCompletionEvent tauce =
|
||||
createTaskAttemptUnsuccessfulCompletionEvent(this,
|
||||
TaskAttemptStateInternal.KILLED);
|
||||
eventHandler.handle(
|
||||
new JobHistoryEvent(attemptId.getTaskId().getJobId(), tauce));
|
||||
}
|
||||
|
||||
if (needToClean) {
|
||||
TaskAttemptContext tac = new TaskAttemptContextImpl(conf,
|
||||
TypeConverter.fromYarn(attemptId));
|
||||
try {
|
||||
committer.abortTask(tac);
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Task cleanup failed for attempt " + attemptId, e);
|
||||
}
|
||||
}
|
||||
|
||||
return attemptState;
|
||||
}
|
||||
|
||||
private static TaskAttemptState getExternalState(
|
||||
TaskAttemptStateInternal smState) {
|
||||
switch (smState) {
|
||||
|
@ -1122,6 +1226,24 @@ public abstract class TaskAttemptImpl implements
|
|||
}
|
||||
}
|
||||
|
||||
private void computeRackAndLocality() {
|
||||
nodeRackName = RackResolver.resolve(
|
||||
containerNodeId.getHost()).getNetworkLocation();
|
||||
|
||||
locality = Locality.OFF_SWITCH;
|
||||
if (dataLocalHosts.size() > 0) {
|
||||
String cHost = resolveHost(containerNodeId.getHost());
|
||||
if (dataLocalHosts.contains(cHost)) {
|
||||
locality = Locality.NODE_LOCAL;
|
||||
}
|
||||
}
|
||||
if (locality == Locality.OFF_SWITCH) {
|
||||
if (dataLocalRacks.contains(nodeRackName)) {
|
||||
locality = Locality.RACK_LOCAL;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static long computeSlotMillis(TaskAttemptImpl taskAttempt) {
|
||||
TaskType taskType = taskAttempt.getID().getTaskId().getTaskType();
|
||||
int slotMemoryReq =
|
||||
|
@ -1141,6 +1263,18 @@ public abstract class TaskAttemptImpl implements
|
|||
return slotMillisIncrement;
|
||||
}
|
||||
|
||||
private static JobCounterUpdateEvent createJobCounterUpdateEventTASucceeded(
|
||||
TaskAttemptImpl taskAttempt) {
|
||||
long slotMillis = computeSlotMillis(taskAttempt);
|
||||
TaskId taskId = taskAttempt.attemptId.getTaskId();
|
||||
JobCounterUpdateEvent jce = new JobCounterUpdateEvent(taskId.getJobId());
|
||||
jce.addCounterUpdate(
|
||||
taskId.getTaskType() == TaskType.MAP ?
|
||||
JobCounter.SLOTS_MILLIS_MAPS : JobCounter.SLOTS_MILLIS_REDUCES,
|
||||
slotMillis);
|
||||
return jce;
|
||||
}
|
||||
|
||||
private static JobCounterUpdateEvent createJobCounterUpdateEventTAFailed(
|
||||
TaskAttemptImpl taskAttempt, boolean taskAlreadyCompleted) {
|
||||
TaskType taskType = taskAttempt.getID().getTaskId().getTaskType();
|
||||
|
@ -1210,6 +1344,26 @@ public abstract class TaskAttemptImpl implements
|
|||
return tauce;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private void sendLaunchedEvents() {
|
||||
JobCounterUpdateEvent jce = new JobCounterUpdateEvent(attemptId.getTaskId()
|
||||
.getJobId());
|
||||
jce.addCounterUpdate(attemptId.getTaskId().getTaskType() == TaskType.MAP ?
|
||||
JobCounter.TOTAL_LAUNCHED_MAPS : JobCounter.TOTAL_LAUNCHED_REDUCES, 1);
|
||||
eventHandler.handle(jce);
|
||||
|
||||
LOG.info("TaskAttempt: [" + attemptId
|
||||
+ "] using containerId: [" + containerID + " on NM: ["
|
||||
+ containerMgrAddress + "]");
|
||||
TaskAttemptStartedEvent tase =
|
||||
new TaskAttemptStartedEvent(TypeConverter.fromYarn(attemptId),
|
||||
TypeConverter.fromYarn(attemptId.getTaskId().getTaskType()),
|
||||
launchTime, trackerName, httpPort, shufflePort, containerID,
|
||||
locality.toString(), avataar.toString());
|
||||
eventHandler.handle(
|
||||
new JobHistoryEvent(attemptId.getTaskId().getJobId(), tase));
|
||||
}
|
||||
|
||||
private WrappedProgressSplitsBlock getProgressSplitBlock() {
|
||||
readLock.lock();
|
||||
try {
|
||||
|
@ -1342,8 +1496,6 @@ public abstract class TaskAttemptImpl implements
|
|||
taskAttempt.containerNodeId.toString());
|
||||
taskAttempt.nodeHttpAddress = StringInterner.weakIntern(
|
||||
cEvent.getContainer().getNodeHttpAddress());
|
||||
taskAttempt.nodeRackName = RackResolver.resolve(
|
||||
taskAttempt.containerNodeId.getHost()).getNetworkLocation();
|
||||
taskAttempt.containerToken = cEvent.getContainer().getContainerToken();
|
||||
taskAttempt.assignedCapability = cEvent.getContainer().getResource();
|
||||
// this is a _real_ Task (classic Hadoop mapred flavor):
|
||||
|
@ -1354,19 +1506,7 @@ public abstract class TaskAttemptImpl implements
|
|||
taskAttempt.taskAttemptListener.registerPendingTask(
|
||||
taskAttempt.remoteTask, taskAttempt.jvmID);
|
||||
|
||||
taskAttempt.locality = Locality.OFF_SWITCH;
|
||||
if (taskAttempt.dataLocalHosts.size() > 0) {
|
||||
String cHost = taskAttempt.resolveHost(
|
||||
taskAttempt.containerNodeId.getHost());
|
||||
if (taskAttempt.dataLocalHosts.contains(cHost)) {
|
||||
taskAttempt.locality = Locality.NODE_LOCAL;
|
||||
}
|
||||
}
|
||||
if (taskAttempt.locality == Locality.OFF_SWITCH) {
|
||||
if (taskAttempt.dataLocalRacks.contains(taskAttempt.nodeRackName)) {
|
||||
taskAttempt.locality = Locality.RACK_LOCAL;
|
||||
}
|
||||
}
|
||||
taskAttempt.computeRackAndLocality();
|
||||
|
||||
//launch the container
|
||||
//create the container object to be launched for a given Task attempt
|
||||
|
@ -1471,27 +1611,7 @@ public abstract class TaskAttemptImpl implements
|
|||
// Costly?
|
||||
taskAttempt.trackerName = nodeHttpInetAddr.getHostName();
|
||||
taskAttempt.httpPort = nodeHttpInetAddr.getPort();
|
||||
JobCounterUpdateEvent jce =
|
||||
new JobCounterUpdateEvent(taskAttempt.attemptId.getTaskId()
|
||||
.getJobId());
|
||||
jce.addCounterUpdate(
|
||||
taskAttempt.attemptId.getTaskId().getTaskType() == TaskType.MAP ?
|
||||
JobCounter.TOTAL_LAUNCHED_MAPS: JobCounter.TOTAL_LAUNCHED_REDUCES
|
||||
, 1);
|
||||
taskAttempt.eventHandler.handle(jce);
|
||||
|
||||
LOG.info("TaskAttempt: [" + taskAttempt.attemptId
|
||||
+ "] using containerId: [" + taskAttempt.containerID + " on NM: ["
|
||||
+ taskAttempt.containerMgrAddress + "]");
|
||||
TaskAttemptStartedEvent tase =
|
||||
new TaskAttemptStartedEvent(TypeConverter.fromYarn(taskAttempt.attemptId),
|
||||
TypeConverter.fromYarn(taskAttempt.attemptId.getTaskId().getTaskType()),
|
||||
taskAttempt.launchTime,
|
||||
nodeHttpInetAddr.getHostName(), nodeHttpInetAddr.getPort(),
|
||||
taskAttempt.shufflePort, taskAttempt.containerID,
|
||||
taskAttempt.locality.toString(), taskAttempt.avataar.toString());
|
||||
taskAttempt.eventHandler.handle
|
||||
(new JobHistoryEvent(taskAttempt.attemptId.getTaskId().getJobId(), tase));
|
||||
taskAttempt.sendLaunchedEvents();
|
||||
taskAttempt.eventHandler.handle
|
||||
(new SpeculatorEvent
|
||||
(taskAttempt.attemptId, true, taskAttempt.clock.getTime()));
|
||||
|
@ -1540,14 +1660,8 @@ public abstract class TaskAttemptImpl implements
|
|||
TaskAttemptEvent event) {
|
||||
//set the finish time
|
||||
taskAttempt.setFinishTime();
|
||||
long slotMillis = computeSlotMillis(taskAttempt);
|
||||
TaskId taskId = taskAttempt.attemptId.getTaskId();
|
||||
JobCounterUpdateEvent jce = new JobCounterUpdateEvent(taskId.getJobId());
|
||||
jce.addCounterUpdate(
|
||||
taskId.getTaskType() == TaskType.MAP ?
|
||||
JobCounter.SLOTS_MILLIS_MAPS : JobCounter.SLOTS_MILLIS_REDUCES,
|
||||
slotMillis);
|
||||
taskAttempt.eventHandler.handle(jce);
|
||||
taskAttempt.eventHandler.handle(
|
||||
createJobCounterUpdateEventTASucceeded(taskAttempt));
|
||||
taskAttempt.logAttemptFinishedEvent(TaskAttemptStateInternal.SUCCEEDED);
|
||||
taskAttempt.eventHandler.handle(new TaskTAttemptEvent(
|
||||
taskAttempt.attemptId,
|
||||
|
@ -1585,6 +1699,18 @@ public abstract class TaskAttemptImpl implements
|
|||
}
|
||||
}
|
||||
|
||||
private static class RecoverTransition implements
|
||||
MultipleArcTransition<TaskAttemptImpl, TaskAttemptEvent, TaskAttemptStateInternal> {
|
||||
|
||||
@Override
|
||||
public TaskAttemptStateInternal transition(TaskAttemptImpl taskAttempt,
|
||||
TaskAttemptEvent event) {
|
||||
TaskAttemptRecoverEvent tare = (TaskAttemptRecoverEvent) event;
|
||||
return taskAttempt.recover(tare.getTaskAttemptInfo(),
|
||||
tare.getCommitter(), tare.getRecoverOutput());
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings({ "unchecked" })
|
||||
private void logAttemptFinishedEvent(TaskAttemptStateInternal state) {
|
||||
//Log finished events only if an attempt started.
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
package org.apache.hadoop.mapreduce.v2.app.job.impl;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.EnumSet;
|
||||
|
@ -37,7 +38,7 @@ import org.apache.hadoop.fs.Path;
|
|||
import org.apache.hadoop.mapred.JobConf;
|
||||
import org.apache.hadoop.mapreduce.Counters;
|
||||
import org.apache.hadoop.mapreduce.MRConfig;
|
||||
import org.apache.hadoop.mapreduce.TaskAttemptID;
|
||||
import org.apache.hadoop.mapreduce.OutputCommitter;
|
||||
import org.apache.hadoop.mapreduce.TypeConverter;
|
||||
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
|
||||
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo;
|
||||
|
@ -69,8 +70,10 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskAttemptCompletedEvent
|
|||
import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskEvent;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptRecoverEvent;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskRecoverEvent;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent;
|
||||
import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
|
||||
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerFailedEvent;
|
||||
|
@ -152,6 +155,12 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
|
|||
TaskEventType.T_SCHEDULE, new InitialScheduleTransition())
|
||||
.addTransition(TaskStateInternal.NEW, TaskStateInternal.KILLED,
|
||||
TaskEventType.T_KILL, new KillNewTransition())
|
||||
.addTransition(TaskStateInternal.NEW,
|
||||
EnumSet.of(TaskStateInternal.FAILED,
|
||||
TaskStateInternal.KILLED,
|
||||
TaskStateInternal.RUNNING,
|
||||
TaskStateInternal.SUCCEEDED),
|
||||
TaskEventType.T_RECOVER, new RecoverTransition())
|
||||
|
||||
// Transitions from SCHEDULED state
|
||||
//when the first attempt is launched, the task state is set to RUNNING
|
||||
|
@ -250,20 +259,16 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
|
|||
|
||||
// By default, the next TaskAttempt number is zero. Changes during recovery
|
||||
protected int nextAttemptNumber = 0;
|
||||
private List<TaskAttemptInfo> taskAttemptsFromPreviousGeneration =
|
||||
new ArrayList<TaskAttemptInfo>();
|
||||
|
||||
private static final class RecoverdAttemptsComparator implements
|
||||
Comparator<TaskAttemptInfo> {
|
||||
@Override
|
||||
public int compare(TaskAttemptInfo attempt1, TaskAttemptInfo attempt2) {
|
||||
long diff = attempt1.getStartTime() - attempt2.getStartTime();
|
||||
return diff == 0 ? 0 : (diff < 0 ? -1 : 1);
|
||||
}
|
||||
}
|
||||
|
||||
private static final RecoverdAttemptsComparator RECOVERED_ATTEMPTS_COMPARATOR =
|
||||
new RecoverdAttemptsComparator();
|
||||
// For sorting task attempts by completion time
|
||||
private static final Comparator<TaskAttemptInfo> TA_INFO_COMPARATOR =
|
||||
new Comparator<TaskAttemptInfo>() {
|
||||
@Override
|
||||
public int compare(TaskAttemptInfo a, TaskAttemptInfo b) {
|
||||
long diff = a.getFinishTime() - b.getFinishTime();
|
||||
return diff == 0 ? 0 : (diff < 0 ? -1 : 1);
|
||||
}
|
||||
};
|
||||
|
||||
@Override
|
||||
public TaskState getState() {
|
||||
|
@ -280,8 +285,7 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
|
|||
TaskAttemptListener taskAttemptListener,
|
||||
Token<JobTokenIdentifier> jobToken,
|
||||
Credentials credentials, Clock clock,
|
||||
Map<TaskId, TaskInfo> completedTasksFromPreviousRun, int startCount,
|
||||
MRAppMetrics metrics, AppContext appContext) {
|
||||
int appAttemptId, MRAppMetrics metrics, AppContext appContext) {
|
||||
this.conf = conf;
|
||||
this.clock = clock;
|
||||
this.jobFile = remoteJobConfFile;
|
||||
|
@ -307,41 +311,15 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
|
|||
this.encryptedShuffle = conf.getBoolean(MRConfig.SHUFFLE_SSL_ENABLED_KEY,
|
||||
MRConfig.SHUFFLE_SSL_ENABLED_DEFAULT);
|
||||
|
||||
// See if this is from a previous generation.
|
||||
if (completedTasksFromPreviousRun != null
|
||||
&& completedTasksFromPreviousRun.containsKey(taskId)) {
|
||||
// This task has TaskAttempts from previous generation. We have to replay
|
||||
// them.
|
||||
LOG.info("Task is from previous run " + taskId);
|
||||
TaskInfo taskInfo = completedTasksFromPreviousRun.get(taskId);
|
||||
Map<TaskAttemptID, TaskAttemptInfo> allAttempts =
|
||||
taskInfo.getAllTaskAttempts();
|
||||
taskAttemptsFromPreviousGeneration = new ArrayList<TaskAttemptInfo>();
|
||||
taskAttemptsFromPreviousGeneration.addAll(allAttempts.values());
|
||||
Collections.sort(taskAttemptsFromPreviousGeneration,
|
||||
RECOVERED_ATTEMPTS_COMPARATOR);
|
||||
}
|
||||
|
||||
if (taskAttemptsFromPreviousGeneration.isEmpty()) {
|
||||
// All the previous attempts are exhausted, now start with a new
|
||||
// generation.
|
||||
|
||||
// All the new TaskAttemptIDs are generated based on MR
|
||||
// ApplicationAttemptID so that attempts from previous lives don't
|
||||
// over-step the current one. This assumes that a task won't have more
|
||||
// than 1000 attempts in its single generation, which is very reasonable.
|
||||
// Someone is nuts if he/she thinks he/she can live with 1000 TaskAttempts
|
||||
// and requires serious medical attention.
|
||||
nextAttemptNumber = (startCount - 1) * 1000;
|
||||
} else {
|
||||
// There are still some TaskAttempts from previous generation, use them
|
||||
nextAttemptNumber =
|
||||
taskAttemptsFromPreviousGeneration.remove(0).getAttemptId().getId();
|
||||
}
|
||||
|
||||
// This "this leak" is okay because the retained pointer is in an
|
||||
// instance variable.
|
||||
stateMachine = stateMachineFactory.make(this);
|
||||
|
||||
// All the new TaskAttemptIDs are generated based on MR
|
||||
// ApplicationAttemptID so that attempts from previous lives don't
|
||||
// over-step the current one. This assumes that a task won't have more
|
||||
// than 1000 attempts in its single generation, which is very reasonable.
|
||||
nextAttemptNumber = (appAttemptId - 1) * 1000;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -600,14 +578,28 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
|
|||
|
||||
// This is always called in the Write Lock
|
||||
private void addAndScheduleAttempt(Avataar avataar) {
|
||||
TaskAttempt attempt = createAttempt();
|
||||
((TaskAttemptImpl) attempt).setAvataar(avataar);
|
||||
TaskAttempt attempt = addAttempt(avataar);
|
||||
inProgressAttempts.add(attempt.getID());
|
||||
//schedule the nextAttemptNumber
|
||||
if (failedAttempts.size() > 0) {
|
||||
eventHandler.handle(new TaskAttemptEvent(attempt.getID(),
|
||||
TaskAttemptEventType.TA_RESCHEDULE));
|
||||
} else {
|
||||
eventHandler.handle(new TaskAttemptEvent(attempt.getID(),
|
||||
TaskAttemptEventType.TA_SCHEDULE));
|
||||
}
|
||||
}
|
||||
|
||||
private TaskAttemptImpl addAttempt(Avataar avataar) {
|
||||
TaskAttemptImpl attempt = createAttempt();
|
||||
attempt.setAvataar(avataar);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Created attempt " + attempt.getID());
|
||||
}
|
||||
switch (attempts.size()) {
|
||||
case 0:
|
||||
attempts = Collections.singletonMap(attempt.getID(), attempt);
|
||||
attempts = Collections.singletonMap(attempt.getID(),
|
||||
(TaskAttempt) attempt);
|
||||
break;
|
||||
|
||||
case 1:
|
||||
|
@ -623,24 +615,8 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
|
|||
break;
|
||||
}
|
||||
|
||||
// Update nextATtemptNumber
|
||||
if (taskAttemptsFromPreviousGeneration.isEmpty()) {
|
||||
++nextAttemptNumber;
|
||||
} else {
|
||||
// There are still some TaskAttempts from previous generation, use them
|
||||
nextAttemptNumber =
|
||||
taskAttemptsFromPreviousGeneration.remove(0).getAttemptId().getId();
|
||||
}
|
||||
|
||||
inProgressAttempts.add(attempt.getID());
|
||||
//schedule the nextAttemptNumber
|
||||
if (failedAttempts.size() > 0) {
|
||||
eventHandler.handle(new TaskAttemptEvent(attempt.getID(),
|
||||
TaskAttemptEventType.TA_RESCHEDULE));
|
||||
} else {
|
||||
eventHandler.handle(new TaskAttemptEvent(attempt.getID(),
|
||||
TaskAttemptEventType.TA_SCHEDULE));
|
||||
}
|
||||
++nextAttemptNumber;
|
||||
return attempt;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -705,6 +681,16 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
|
|||
}
|
||||
}
|
||||
|
||||
private void sendTaskStartedEvent() {
|
||||
TaskStartedEvent tse = new TaskStartedEvent(
|
||||
TypeConverter.fromYarn(taskId), getLaunchTime(),
|
||||
TypeConverter.fromYarn(taskId.getTaskType()),
|
||||
getSplitsAsString());
|
||||
eventHandler
|
||||
.handle(new JobHistoryEvent(taskId.getJobId(), tse));
|
||||
historyTaskStartGenerated = true;
|
||||
}
|
||||
|
||||
private static TaskFinishedEvent createTaskFinishedEvent(TaskImpl task, TaskStateInternal taskState) {
|
||||
TaskFinishedEvent tfe =
|
||||
new TaskFinishedEvent(TypeConverter.fromYarn(task.taskId),
|
||||
|
@ -740,6 +726,16 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
|
|||
task.successfulAttempt = null;
|
||||
}
|
||||
|
||||
private void sendTaskSucceededEvents() {
|
||||
eventHandler.handle(new JobTaskEvent(taskId, TaskState.SUCCEEDED));
|
||||
LOG.info("Task succeeded with attempt " + successfulAttempt);
|
||||
if (historyTaskStartGenerated) {
|
||||
TaskFinishedEvent tfe = createTaskFinishedEvent(this,
|
||||
TaskStateInternal.SUCCEEDED);
|
||||
eventHandler.handle(new JobHistoryEvent(taskId.getJobId(), tfe));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @return a String representation of the splits.
|
||||
*
|
||||
|
@ -751,6 +747,122 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
|
|||
return "";
|
||||
}
|
||||
|
||||
/**
|
||||
* Recover a completed task from a previous application attempt
|
||||
* @param taskInfo recovered info about the task
|
||||
* @param recoverTaskOutput whether to recover task outputs
|
||||
* @return state of the task after recovery
|
||||
*/
|
||||
private TaskStateInternal recover(TaskInfo taskInfo,
|
||||
OutputCommitter committer, boolean recoverTaskOutput) {
|
||||
LOG.info("Recovering task " + taskId
|
||||
+ " from prior app attempt, status was " + taskInfo.getTaskStatus());
|
||||
|
||||
scheduledTime = taskInfo.getStartTime();
|
||||
sendTaskStartedEvent();
|
||||
Collection<TaskAttemptInfo> attemptInfos =
|
||||
taskInfo.getAllTaskAttempts().values();
|
||||
|
||||
if (attemptInfos.size() > 0) {
|
||||
metrics.launchedTask(this);
|
||||
}
|
||||
|
||||
// recover the attempts for this task in the order they finished
|
||||
// so task attempt completion events are ordered properly
|
||||
int savedNextAttemptNumber = nextAttemptNumber;
|
||||
ArrayList<TaskAttemptInfo> taInfos =
|
||||
new ArrayList<TaskAttemptInfo>(taskInfo.getAllTaskAttempts().values());
|
||||
Collections.sort(taInfos, TA_INFO_COMPARATOR);
|
||||
for (TaskAttemptInfo taInfo : taInfos) {
|
||||
nextAttemptNumber = taInfo.getAttemptId().getId();
|
||||
TaskAttemptImpl attempt = addAttempt(Avataar.VIRGIN);
|
||||
// handle the recovery inline so attempts complete before task does
|
||||
attempt.handle(new TaskAttemptRecoverEvent(attempt.getID(), taInfo,
|
||||
committer, recoverTaskOutput));
|
||||
finishedAttempts.add(attempt.getID());
|
||||
TaskAttemptCompletionEventStatus taces = null;
|
||||
TaskAttemptState attemptState = attempt.getState();
|
||||
switch (attemptState) {
|
||||
case FAILED:
|
||||
taces = TaskAttemptCompletionEventStatus.FAILED;
|
||||
break;
|
||||
case KILLED:
|
||||
taces = TaskAttemptCompletionEventStatus.KILLED;
|
||||
break;
|
||||
case SUCCEEDED:
|
||||
taces = TaskAttemptCompletionEventStatus.SUCCEEDED;
|
||||
break;
|
||||
default:
|
||||
throw new IllegalStateException(
|
||||
"Unexpected attempt state during recovery: " + attemptState);
|
||||
}
|
||||
if (attemptState == TaskAttemptState.FAILED) {
|
||||
failedAttempts.add(attempt.getID());
|
||||
if (failedAttempts.size() >= maxAttempts) {
|
||||
taces = TaskAttemptCompletionEventStatus.TIPFAILED;
|
||||
}
|
||||
}
|
||||
|
||||
// don't clobber the successful attempt completion event
|
||||
// TODO: this shouldn't be necessary after MAPREDUCE-4330
|
||||
if (successfulAttempt == null) {
|
||||
handleTaskAttemptCompletion(attempt.getID(), taces);
|
||||
if (attemptState == TaskAttemptState.SUCCEEDED) {
|
||||
successfulAttempt = attempt.getID();
|
||||
}
|
||||
}
|
||||
}
|
||||
nextAttemptNumber = savedNextAttemptNumber;
|
||||
|
||||
TaskStateInternal taskState = TaskStateInternal.valueOf(
|
||||
taskInfo.getTaskStatus());
|
||||
switch (taskState) {
|
||||
case SUCCEEDED:
|
||||
if (successfulAttempt != null) {
|
||||
sendTaskSucceededEvents();
|
||||
} else {
|
||||
LOG.info("Missing successful attempt for task " + taskId
|
||||
+ ", recovering as RUNNING");
|
||||
// there must have been a fetch failure and the retry wasn't complete
|
||||
taskState = TaskStateInternal.RUNNING;
|
||||
metrics.runningTask(this);
|
||||
addAndScheduleAttempt(Avataar.VIRGIN);
|
||||
}
|
||||
break;
|
||||
case FAILED:
|
||||
case KILLED:
|
||||
{
|
||||
if (taskState == TaskStateInternal.KILLED && attemptInfos.size() == 0) {
|
||||
metrics.endWaitingTask(this);
|
||||
}
|
||||
TaskFailedEvent tfe = new TaskFailedEvent(taskInfo.getTaskId(),
|
||||
taskInfo.getFinishTime(), taskInfo.getTaskType(),
|
||||
taskInfo.getError(), taskInfo.getTaskStatus(),
|
||||
taskInfo.getFailedDueToAttemptId(), taskInfo.getCounters());
|
||||
eventHandler.handle(new JobHistoryEvent(taskId.getJobId(), tfe));
|
||||
eventHandler.handle(
|
||||
new JobTaskEvent(taskId, getExternalState(taskState)));
|
||||
break;
|
||||
}
|
||||
default:
|
||||
throw new java.lang.AssertionError("Unexpected recovered task state: "
|
||||
+ taskState);
|
||||
}
|
||||
|
||||
return taskState;
|
||||
}
|
||||
|
||||
private static class RecoverTransition
|
||||
implements MultipleArcTransition<TaskImpl, TaskEvent, TaskStateInternal> {
|
||||
|
||||
@Override
|
||||
public TaskStateInternal transition(TaskImpl task, TaskEvent event) {
|
||||
TaskRecoverEvent tre = (TaskRecoverEvent) event;
|
||||
return task.recover(tre.getTaskInfo(), tre.getOutputCommitter(),
|
||||
tre.getRecoverTaskOutput());
|
||||
}
|
||||
}
|
||||
|
||||
private static class InitialScheduleTransition
|
||||
implements SingleArcTransition<TaskImpl, TaskEvent> {
|
||||
|
||||
|
@ -758,13 +870,7 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
|
|||
public void transition(TaskImpl task, TaskEvent event) {
|
||||
task.addAndScheduleAttempt(Avataar.VIRGIN);
|
||||
task.scheduledTime = task.clock.getTime();
|
||||
TaskStartedEvent tse = new TaskStartedEvent(
|
||||
TypeConverter.fromYarn(task.taskId), task.getLaunchTime(),
|
||||
TypeConverter.fromYarn(task.taskId.getTaskType()),
|
||||
task.getSplitsAsString());
|
||||
task.eventHandler
|
||||
.handle(new JobHistoryEvent(task.taskId.getJobId(), tse));
|
||||
task.historyTaskStartGenerated = true;
|
||||
task.sendTaskStartedEvent();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -818,16 +924,7 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
|
|||
task.finishedAttempts.add(taskAttemptId);
|
||||
task.inProgressAttempts.remove(taskAttemptId);
|
||||
task.successfulAttempt = taskAttemptId;
|
||||
task.eventHandler.handle(new JobTaskEvent(
|
||||
task.taskId, TaskState.SUCCEEDED));
|
||||
LOG.info("Task succeeded with attempt " + task.successfulAttempt);
|
||||
// issue kill to all other attempts
|
||||
if (task.historyTaskStartGenerated) {
|
||||
TaskFinishedEvent tfe = createTaskFinishedEvent(task,
|
||||
TaskStateInternal.SUCCEEDED);
|
||||
task.eventHandler.handle(new JobHistoryEvent(task.taskId.getJobId(),
|
||||
tfe));
|
||||
}
|
||||
task.sendTaskSucceededEvents();
|
||||
for (TaskAttempt attempt : task.attempts.values()) {
|
||||
if (attempt.getID() != task.successfulAttempt &&
|
||||
// This is okay because it can only talk us out of sending a
|
||||
|
|
|
@ -1,39 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.mapreduce.v2.app.recover;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
|
||||
import org.apache.hadoop.yarn.Clock;
|
||||
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||
|
||||
public interface Recovery {
|
||||
|
||||
Dispatcher getDispatcher();
|
||||
|
||||
Clock getClock();
|
||||
|
||||
Map<TaskId, TaskInfo> getCompletedTasks();
|
||||
|
||||
List<AMInfo> getAMInfos();
|
||||
}
|
|
@ -1,480 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.mapreduce.v2.app.recover;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.FileContext;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.mapred.JobConf;
|
||||
import org.apache.hadoop.mapreduce.MRJobConfig;
|
||||
import org.apache.hadoop.mapreduce.OutputCommitter;
|
||||
import org.apache.hadoop.mapreduce.TaskAttemptContext;
|
||||
import org.apache.hadoop.mapreduce.TaskAttemptID;
|
||||
import org.apache.hadoop.mapreduce.TaskType;
|
||||
import org.apache.hadoop.mapreduce.TypeConverter;
|
||||
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser;
|
||||
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
|
||||
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo;
|
||||
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
|
||||
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.Phase;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
|
||||
import org.apache.hadoop.mapreduce.v2.app.ControlledClock;
|
||||
import org.apache.hadoop.mapreduce.v2.app.commit.CommitterTaskAbortEvent;
|
||||
import org.apache.hadoop.mapreduce.v2.app.commit.CommitterEventType;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.event.JobDiagnosticsUpdateEvent;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerLaunchedEvent;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent;
|
||||
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
|
||||
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent;
|
||||
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerRemoteLaunchEvent;
|
||||
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
|
||||
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
|
||||
import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
|
||||
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
|
||||
import org.apache.hadoop.yarn.Clock;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.event.AsyncDispatcher;
|
||||
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||
import org.apache.hadoop.yarn.event.Event;
|
||||
import org.apache.hadoop.yarn.event.EventHandler;
|
||||
import org.apache.hadoop.yarn.service.CompositeService;
|
||||
import org.apache.hadoop.yarn.service.Service;
|
||||
import org.apache.hadoop.yarn.util.BuilderUtils;
|
||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||
|
||||
/*
|
||||
* Recovers the completed tasks from the previous life of Application Master.
|
||||
* The completed tasks are deciphered from the history file of the previous life.
|
||||
* Recovery service intercepts and replay the events for completed tasks.
|
||||
* While recovery is in progress, the scheduling of new tasks are delayed by
|
||||
* buffering the task schedule events.
|
||||
* The recovery service controls the clock while recovery is in progress.
|
||||
*/
|
||||
|
||||
//TODO:
|
||||
//task cleanup for all non completed tasks
|
||||
public class RecoveryService extends CompositeService implements Recovery {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(RecoveryService.class);
|
||||
|
||||
private final ApplicationAttemptId applicationAttemptId;
|
||||
private final OutputCommitter committer;
|
||||
private final boolean newApiCommitter;
|
||||
private final Dispatcher dispatcher;
|
||||
private final ControlledClock clock;
|
||||
|
||||
private JobInfo jobInfo = null;
|
||||
private final Map<TaskId, TaskInfo> completedTasks =
|
||||
new HashMap<TaskId, TaskInfo>();
|
||||
|
||||
private final List<TaskEvent> pendingTaskScheduleEvents =
|
||||
new ArrayList<TaskEvent>();
|
||||
|
||||
private volatile boolean recoveryMode = false;
|
||||
|
||||
public RecoveryService(ApplicationAttemptId applicationAttemptId,
|
||||
Clock clock, OutputCommitter committer, boolean newApiCommitter) {
|
||||
super("RecoveringDispatcher");
|
||||
this.applicationAttemptId = applicationAttemptId;
|
||||
this.committer = committer;
|
||||
this.newApiCommitter = newApiCommitter;
|
||||
this.dispatcher = createRecoveryDispatcher();
|
||||
this.clock = new ControlledClock(clock);
|
||||
addService((Service) dispatcher);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void init(Configuration conf) {
|
||||
super.init(conf);
|
||||
// parse the history file
|
||||
try {
|
||||
parse();
|
||||
} catch (Exception e) {
|
||||
LOG.warn(e);
|
||||
LOG.warn("Could not parse the old history file. Aborting recovery. "
|
||||
+ "Starting afresh.", e);
|
||||
}
|
||||
if (completedTasks.size() > 0) {
|
||||
recoveryMode = true;
|
||||
LOG.info("SETTING THE RECOVERY MODE TO TRUE. NO OF COMPLETED TASKS "
|
||||
+ "TO RECOVER " + completedTasks.size());
|
||||
LOG.info("Job launch time " + jobInfo.getLaunchTime());
|
||||
clock.setTime(jobInfo.getLaunchTime());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Dispatcher getDispatcher() {
|
||||
return dispatcher;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Clock getClock() {
|
||||
return clock;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<TaskId, TaskInfo> getCompletedTasks() {
|
||||
return completedTasks;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<AMInfo> getAMInfos() {
|
||||
if (jobInfo == null || jobInfo.getAMInfos() == null) {
|
||||
return new LinkedList<AMInfo>();
|
||||
}
|
||||
List<AMInfo> amInfos = new LinkedList<AMInfo>();
|
||||
for (org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.AMInfo jhAmInfo : jobInfo
|
||||
.getAMInfos()) {
|
||||
AMInfo amInfo =
|
||||
MRBuilderUtils.newAMInfo(jhAmInfo.getAppAttemptId(),
|
||||
jhAmInfo.getStartTime(), jhAmInfo.getContainerId(),
|
||||
jhAmInfo.getNodeManagerHost(), jhAmInfo.getNodeManagerPort(),
|
||||
jhAmInfo.getNodeManagerHttpPort());
|
||||
|
||||
amInfos.add(amInfo);
|
||||
}
|
||||
return amInfos;
|
||||
}
|
||||
|
||||
private void parse() throws IOException {
|
||||
FSDataInputStream in =
|
||||
getPreviousJobHistoryFileStream(getConfig(), applicationAttemptId);
|
||||
JobHistoryParser parser = new JobHistoryParser(in);
|
||||
jobInfo = parser.parse();
|
||||
Exception parseException = parser.getParseException();
|
||||
if (parseException != null) {
|
||||
LOG.info("Got an error parsing job-history file" +
|
||||
", ignoring incomplete events.", parseException);
|
||||
}
|
||||
Map<org.apache.hadoop.mapreduce.TaskID, TaskInfo> taskInfos = jobInfo
|
||||
.getAllTasks();
|
||||
for (TaskInfo taskInfo : taskInfos.values()) {
|
||||
if (TaskState.SUCCEEDED.toString().equals(taskInfo.getTaskStatus())) {
|
||||
Iterator<Entry<TaskAttemptID, TaskAttemptInfo>> taskAttemptIterator =
|
||||
taskInfo.getAllTaskAttempts().entrySet().iterator();
|
||||
while (taskAttemptIterator.hasNext()) {
|
||||
Map.Entry<TaskAttemptID, TaskAttemptInfo> currentEntry = taskAttemptIterator.next();
|
||||
if (!jobInfo.getAllCompletedTaskAttempts().containsKey(currentEntry.getKey())) {
|
||||
taskAttemptIterator.remove();
|
||||
}
|
||||
}
|
||||
completedTasks
|
||||
.put(TypeConverter.toYarn(taskInfo.getTaskId()), taskInfo);
|
||||
LOG.info("Read from history task "
|
||||
+ TypeConverter.toYarn(taskInfo.getTaskId()));
|
||||
}
|
||||
}
|
||||
LOG.info("Read completed tasks from history "
|
||||
+ completedTasks.size());
|
||||
}
|
||||
|
||||
public static FSDataInputStream getPreviousJobHistoryFileStream(
|
||||
Configuration conf, ApplicationAttemptId applicationAttemptId)
|
||||
throws IOException {
|
||||
FSDataInputStream in = null;
|
||||
Path historyFile = null;
|
||||
String jobId =
|
||||
TypeConverter.fromYarn(applicationAttemptId.getApplicationId())
|
||||
.toString();
|
||||
String jobhistoryDir =
|
||||
JobHistoryUtils.getConfiguredHistoryStagingDirPrefix(conf, jobId);
|
||||
Path histDirPath =
|
||||
FileContext.getFileContext(conf).makeQualified(new Path(jobhistoryDir));
|
||||
LOG.info("Trying file " + histDirPath.toString());
|
||||
FileContext fc = FileContext.getFileContext(histDirPath.toUri(), conf);
|
||||
// read the previous history file
|
||||
historyFile =
|
||||
fc.makeQualified(JobHistoryUtils.getStagingJobHistoryFile(histDirPath,
|
||||
jobId, (applicationAttemptId.getAttemptId() - 1)));
|
||||
LOG.info("History file is at " + historyFile);
|
||||
in = fc.open(historyFile);
|
||||
return in;
|
||||
}
|
||||
|
||||
protected Dispatcher createRecoveryDispatcher() {
|
||||
return new RecoveryDispatcher();
|
||||
}
|
||||
|
||||
@SuppressWarnings("rawtypes")
|
||||
class RecoveryDispatcher extends AsyncDispatcher {
|
||||
private final EventHandler actualHandler;
|
||||
private final EventHandler handler;
|
||||
|
||||
RecoveryDispatcher() {
|
||||
super();
|
||||
actualHandler = super.getEventHandler();
|
||||
handler = new InterceptingEventHandler(actualHandler);
|
||||
}
|
||||
|
||||
@Override
|
||||
@SuppressWarnings("unchecked")
|
||||
public void dispatch(Event event) {
|
||||
if (recoveryMode) {
|
||||
if (event.getType() == TaskAttemptEventType.TA_CONTAINER_LAUNCHED) {
|
||||
TaskAttemptInfo attInfo = getTaskAttemptInfo(((TaskAttemptEvent) event)
|
||||
.getTaskAttemptID());
|
||||
LOG.info("Recovered Attempt start time " + attInfo.getStartTime());
|
||||
clock.setTime(attInfo.getStartTime());
|
||||
|
||||
} else if (event.getType() == TaskAttemptEventType.TA_DONE
|
||||
|| event.getType() == TaskAttemptEventType.TA_FAILMSG
|
||||
|| event.getType() == TaskAttemptEventType.TA_KILL) {
|
||||
TaskAttemptInfo attInfo = getTaskAttemptInfo(((TaskAttemptEvent) event)
|
||||
.getTaskAttemptID());
|
||||
LOG.info("Recovered Attempt finish time " + attInfo.getFinishTime());
|
||||
clock.setTime(attInfo.getFinishTime());
|
||||
}
|
||||
|
||||
else if (event.getType() == TaskEventType.T_ATTEMPT_FAILED
|
||||
|| event.getType() == TaskEventType.T_ATTEMPT_KILLED
|
||||
|| event.getType() == TaskEventType.T_ATTEMPT_SUCCEEDED) {
|
||||
TaskTAttemptEvent tEvent = (TaskTAttemptEvent) event;
|
||||
LOG.info("Recovered Task attempt " + tEvent.getTaskAttemptID());
|
||||
TaskInfo taskInfo = completedTasks.get(tEvent.getTaskAttemptID()
|
||||
.getTaskId());
|
||||
taskInfo.getAllTaskAttempts().remove(
|
||||
TypeConverter.fromYarn(tEvent.getTaskAttemptID()));
|
||||
// remove the task info from completed tasks if all attempts are
|
||||
// recovered
|
||||
if (taskInfo.getAllTaskAttempts().size() == 0) {
|
||||
completedTasks.remove(tEvent.getTaskAttemptID().getTaskId());
|
||||
// checkForRecoveryComplete
|
||||
LOG.info("CompletedTasks() " + completedTasks.size());
|
||||
if (completedTasks.size() == 0) {
|
||||
recoveryMode = false;
|
||||
clock.reset();
|
||||
LOG.info("Setting the recovery mode to false. " +
|
||||
"Recovery is complete!");
|
||||
|
||||
// send all pending tasks schedule events
|
||||
for (TaskEvent tEv : pendingTaskScheduleEvents) {
|
||||
actualHandler.handle(tEv);
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
realDispatch(event);
|
||||
}
|
||||
|
||||
public void realDispatch(Event event) {
|
||||
super.dispatch(event);
|
||||
}
|
||||
|
||||
@Override
|
||||
public EventHandler getEventHandler() {
|
||||
return handler;
|
||||
}
|
||||
}
|
||||
|
||||
private TaskAttemptInfo getTaskAttemptInfo(TaskAttemptId id) {
|
||||
TaskInfo taskInfo = completedTasks.get(id.getTaskId());
|
||||
return taskInfo.getAllTaskAttempts().get(TypeConverter.fromYarn(id));
|
||||
}
|
||||
|
||||
@SuppressWarnings({"rawtypes", "unchecked"})
|
||||
private class InterceptingEventHandler implements EventHandler {
|
||||
EventHandler actualHandler;
|
||||
|
||||
InterceptingEventHandler(EventHandler actualHandler) {
|
||||
this.actualHandler = actualHandler;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handle(Event event) {
|
||||
if (!recoveryMode) {
|
||||
// delegate to the dispatcher one
|
||||
actualHandler.handle(event);
|
||||
return;
|
||||
}
|
||||
|
||||
else if (event.getType() == TaskEventType.T_SCHEDULE) {
|
||||
TaskEvent taskEvent = (TaskEvent) event;
|
||||
// delay the scheduling of new tasks till previous ones are recovered
|
||||
if (completedTasks.get(taskEvent.getTaskID()) == null) {
|
||||
LOG.debug("Adding to pending task events "
|
||||
+ taskEvent.getTaskID());
|
||||
pendingTaskScheduleEvents.add(taskEvent);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
else if (event.getType() == ContainerAllocator.EventType.CONTAINER_REQ) {
|
||||
TaskAttemptId aId = ((ContainerAllocatorEvent) event).getAttemptID();
|
||||
TaskAttemptInfo attInfo = getTaskAttemptInfo(aId);
|
||||
LOG.debug("CONTAINER_REQ " + aId);
|
||||
sendAssignedEvent(aId, attInfo);
|
||||
return;
|
||||
}
|
||||
|
||||
else if (event.getType() == CommitterEventType.TASK_ABORT) {
|
||||
TaskAttemptId aId = ((CommitterTaskAbortEvent) event).getAttemptID();
|
||||
LOG.debug("TASK_CLEAN");
|
||||
actualHandler.handle(new TaskAttemptEvent(aId,
|
||||
TaskAttemptEventType.TA_CLEANUP_DONE));
|
||||
return;
|
||||
}
|
||||
|
||||
else if (event.getType() == ContainerLauncher.EventType.CONTAINER_REMOTE_LAUNCH) {
|
||||
TaskAttemptId aId = ((ContainerRemoteLaunchEvent) event)
|
||||
.getTaskAttemptID();
|
||||
TaskAttemptInfo attInfo = getTaskAttemptInfo(aId);
|
||||
actualHandler.handle(new TaskAttemptContainerLaunchedEvent(aId,
|
||||
attInfo.getShufflePort()));
|
||||
// send the status update event
|
||||
sendStatusUpdateEvent(aId, attInfo);
|
||||
|
||||
TaskAttemptState state = TaskAttemptState.valueOf(attInfo.getTaskStatus());
|
||||
switch (state) {
|
||||
case SUCCEEDED:
|
||||
//recover the task output
|
||||
|
||||
// check the committer type and construct corresponding context
|
||||
TaskAttemptContext taskContext = null;
|
||||
if(newApiCommitter) {
|
||||
taskContext = new TaskAttemptContextImpl(getConfig(),
|
||||
attInfo.getAttemptId());
|
||||
} else {
|
||||
taskContext = new org.apache.hadoop.mapred.TaskAttemptContextImpl(new JobConf(getConfig()),
|
||||
TypeConverter.fromYarn(aId));
|
||||
}
|
||||
|
||||
try {
|
||||
TaskType type = taskContext.getTaskAttemptID().getTaskID().getTaskType();
|
||||
int numReducers = taskContext.getConfiguration().getInt(MRJobConfig.NUM_REDUCES, 1);
|
||||
if(type == TaskType.REDUCE || (type == TaskType.MAP && numReducers <= 0)) {
|
||||
committer.recoverTask(taskContext);
|
||||
LOG.info("Recovered output from task attempt " + attInfo.getAttemptId());
|
||||
} else {
|
||||
LOG.info("Will not try to recover output for "
|
||||
+ taskContext.getTaskAttemptID());
|
||||
}
|
||||
} catch (IOException e) {
|
||||
LOG.error("Caught an exception while trying to recover task "+aId, e);
|
||||
actualHandler.handle(new JobDiagnosticsUpdateEvent(
|
||||
aId.getTaskId().getJobId(), "Error in recovering task output " +
|
||||
e.getMessage()));
|
||||
actualHandler.handle(new JobEvent(aId.getTaskId().getJobId(),
|
||||
JobEventType.INTERNAL_ERROR));
|
||||
}
|
||||
|
||||
// send the done event
|
||||
LOG.info("Sending done event to recovered attempt " + aId);
|
||||
actualHandler.handle(new TaskAttemptEvent(aId,
|
||||
TaskAttemptEventType.TA_DONE));
|
||||
break;
|
||||
case KILLED:
|
||||
LOG.info("Sending kill event to recovered attempt " + aId);
|
||||
actualHandler.handle(new TaskAttemptEvent(aId,
|
||||
TaskAttemptEventType.TA_KILL));
|
||||
break;
|
||||
default:
|
||||
LOG.info("Sending fail event to recovered attempt " + aId);
|
||||
actualHandler.handle(new TaskAttemptEvent(aId,
|
||||
TaskAttemptEventType.TA_FAILMSG));
|
||||
break;
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
else if (event.getType() ==
|
||||
ContainerLauncher.EventType.CONTAINER_REMOTE_CLEANUP) {
|
||||
TaskAttemptId aId = ((ContainerLauncherEvent) event)
|
||||
.getTaskAttemptID();
|
||||
actualHandler.handle(
|
||||
new TaskAttemptEvent(aId,
|
||||
TaskAttemptEventType.TA_CONTAINER_CLEANED));
|
||||
return;
|
||||
}
|
||||
|
||||
// delegate to the actual handler
|
||||
actualHandler.handle(event);
|
||||
}
|
||||
|
||||
private void sendStatusUpdateEvent(TaskAttemptId yarnAttemptID,
|
||||
TaskAttemptInfo attemptInfo) {
|
||||
LOG.info("Sending status update event to " + yarnAttemptID);
|
||||
TaskAttemptStatus taskAttemptStatus = new TaskAttemptStatus();
|
||||
taskAttemptStatus.id = yarnAttemptID;
|
||||
taskAttemptStatus.progress = 1.0f;
|
||||
taskAttemptStatus.stateString = attemptInfo.getTaskStatus();
|
||||
// taskAttemptStatus.outputSize = attemptInfo.getOutputSize();
|
||||
taskAttemptStatus.phase = Phase.CLEANUP;
|
||||
org.apache.hadoop.mapreduce.Counters cntrs = attemptInfo.getCounters();
|
||||
if (cntrs == null) {
|
||||
taskAttemptStatus.counters = null;
|
||||
} else {
|
||||
taskAttemptStatus.counters = cntrs;
|
||||
}
|
||||
actualHandler.handle(new TaskAttemptStatusUpdateEvent(
|
||||
taskAttemptStatus.id, taskAttemptStatus));
|
||||
}
|
||||
|
||||
private void sendAssignedEvent(TaskAttemptId yarnAttemptID,
|
||||
TaskAttemptInfo attemptInfo) {
|
||||
LOG.info("Sending assigned event to " + yarnAttemptID);
|
||||
ContainerId cId = attemptInfo.getContainerId();
|
||||
|
||||
NodeId nodeId =
|
||||
ConverterUtils.toNodeId(attemptInfo.getHostname() + ":"
|
||||
+ attemptInfo.getPort());
|
||||
// Resource/Priority/ApplicationACLs are only needed while launching the
|
||||
// container on an NM, these are already completed tasks, so setting them
|
||||
// to null
|
||||
Container container = BuilderUtils.newContainer(cId, nodeId,
|
||||
attemptInfo.getTrackerName() + ":" + attemptInfo.getHttpPort(),
|
||||
null, null, null);
|
||||
actualHandler.handle(new TaskAttemptContainerAssignedEvent(yarnAttemptID,
|
||||
container, null));
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -414,7 +414,8 @@ public class MRApp extends MRAppMaster {
|
|||
Job newJob = new TestJob(getJobId(), getAttemptID(), conf,
|
||||
getDispatcher().getEventHandler(),
|
||||
getTaskAttemptListener(), getContext().getClock(),
|
||||
isNewApiCommitter(), currentUser.getUserName(), getContext(),
|
||||
getCommitter(), isNewApiCommitter(),
|
||||
currentUser.getUserName(), getContext(),
|
||||
forcedState, diagnostic);
|
||||
((AppContext) getContext()).getAllJobs().put(newJob.getID(), newJob);
|
||||
|
||||
|
@ -648,12 +649,13 @@ public class MRApp extends MRAppMaster {
|
|||
public TestJob(JobId jobId, ApplicationAttemptId applicationAttemptId,
|
||||
Configuration conf, EventHandler eventHandler,
|
||||
TaskAttemptListener taskAttemptListener, Clock clock,
|
||||
boolean newApiCommitter, String user, AppContext appContext,
|
||||
OutputCommitter committer, boolean newApiCommitter,
|
||||
String user, AppContext appContext,
|
||||
JobStateInternal forcedState, String diagnostic) {
|
||||
super(jobId, getApplicationAttemptId(applicationId, getStartCount()),
|
||||
conf, eventHandler, taskAttemptListener,
|
||||
new JobTokenSecretManager(), new Credentials(), clock,
|
||||
getCompletedTaskFromPreviousRun(), metrics,
|
||||
getCompletedTaskFromPreviousRun(), metrics, committer,
|
||||
newApiCommitter, user, System.currentTimeMillis(), getAllAMInfos(),
|
||||
appContext, forcedState, diagnostic);
|
||||
|
||||
|
|
|
@ -18,10 +18,21 @@
|
|||
|
||||
package org.apache.hadoop.mapreduce.v2.app;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.mockito.Mockito.atLeast;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import junit.framework.Assert;
|
||||
|
||||
|
@ -31,36 +42,66 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.io.NullWritable;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.mapred.JobConf;
|
||||
import org.apache.hadoop.mapreduce.Counters;
|
||||
import org.apache.hadoop.mapreduce.JobCounter;
|
||||
import org.apache.hadoop.mapreduce.JobID;
|
||||
import org.apache.hadoop.mapreduce.MRJobConfig;
|
||||
import org.apache.hadoop.mapreduce.OutputCommitter;
|
||||
import org.apache.hadoop.mapreduce.OutputFormat;
|
||||
import org.apache.hadoop.mapreduce.RecordWriter;
|
||||
import org.apache.hadoop.mapreduce.TaskAttemptContext;
|
||||
import org.apache.hadoop.mapreduce.TaskAttemptID;
|
||||
import org.apache.hadoop.mapreduce.TaskID;
|
||||
import org.apache.hadoop.mapreduce.TaskType;
|
||||
import org.apache.hadoop.mapreduce.TypeConverter;
|
||||
import org.apache.hadoop.mapreduce.jobhistory.Event;
|
||||
import org.apache.hadoop.mapreduce.jobhistory.EventType;
|
||||
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
|
||||
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler;
|
||||
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo;
|
||||
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
|
||||
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
|
||||
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
|
||||
import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
|
||||
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
|
||||
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.Job;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.Task;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.event.JobCounterUpdateEvent;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskEvent;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerLaunchedEvent;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskRecoverEvent;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.impl.MapTaskImpl;
|
||||
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
|
||||
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent;
|
||||
import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
|
||||
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
|
||||
import org.apache.hadoop.security.Credentials;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.util.ReflectionUtils;
|
||||
import org.apache.hadoop.yarn.Clock;
|
||||
import org.apache.hadoop.yarn.ClusterInfo;
|
||||
import org.apache.hadoop.yarn.SystemClock;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.event.EventHandler;
|
||||
import org.apache.hadoop.yarn.util.BuilderUtils;
|
||||
import org.junit.Test;
|
||||
import org.mockito.ArgumentCaptor;
|
||||
|
||||
@SuppressWarnings({"unchecked", "rawtypes"})
|
||||
public class TestRecovery {
|
||||
|
@ -75,6 +116,7 @@ public class TestRecovery {
|
|||
private Text val1 = new Text("val1");
|
||||
private Text val2 = new Text("val2");
|
||||
|
||||
|
||||
/**
|
||||
* AM with 2 maps and 1 reduce. For 1st map, one attempt fails, one attempt
|
||||
* completely disappears because of failed launch, one attempt gets killed and
|
||||
|
@ -1011,6 +1053,423 @@ public class TestRecovery {
|
|||
app.verifyCompleted();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRecoverySuccessAttempt() {
|
||||
LOG.info("--- START: testRecoverySuccessAttempt ---");
|
||||
|
||||
long clusterTimestamp = System.currentTimeMillis();
|
||||
EventHandler mockEventHandler = mock(EventHandler.class);
|
||||
MapTaskImpl recoverMapTask = getMockMapTask(clusterTimestamp,
|
||||
mockEventHandler);
|
||||
|
||||
TaskId taskId = recoverMapTask.getID();
|
||||
JobID jobID = new JobID(Long.toString(clusterTimestamp), 1);
|
||||
TaskID taskID = new TaskID(jobID,
|
||||
org.apache.hadoop.mapreduce.TaskType.MAP, taskId.getId());
|
||||
|
||||
//Mock up the TaskAttempts
|
||||
Map<TaskAttemptID, TaskAttemptInfo> mockTaskAttempts =
|
||||
new HashMap<TaskAttemptID, TaskAttemptInfo>();
|
||||
|
||||
TaskAttemptID taId1 = new TaskAttemptID(taskID, 2);
|
||||
TaskAttemptInfo mockTAinfo1 = getMockTaskAttemptInfo(taId1,
|
||||
TaskAttemptState.SUCCEEDED);
|
||||
mockTaskAttempts.put(taId1, mockTAinfo1);
|
||||
|
||||
TaskAttemptID taId2 = new TaskAttemptID(taskID, 1);
|
||||
TaskAttemptInfo mockTAinfo2 = getMockTaskAttemptInfo(taId2,
|
||||
TaskAttemptState.FAILED);
|
||||
mockTaskAttempts.put(taId2, mockTAinfo2);
|
||||
|
||||
OutputCommitter mockCommitter = mock (OutputCommitter.class);
|
||||
TaskInfo mockTaskInfo = mock(TaskInfo.class);
|
||||
when(mockTaskInfo.getTaskStatus()).thenReturn("SUCCEEDED");
|
||||
when(mockTaskInfo.getTaskId()).thenReturn(taskID);
|
||||
when(mockTaskInfo.getAllTaskAttempts()).thenReturn(mockTaskAttempts);
|
||||
|
||||
recoverMapTask.handle(
|
||||
new TaskRecoverEvent(taskId, mockTaskInfo,mockCommitter, true));
|
||||
|
||||
ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
|
||||
verify(mockEventHandler,atLeast(1)).handle(
|
||||
(org.apache.hadoop.yarn.event.Event) arg.capture());
|
||||
|
||||
Map<TaskAttemptID, TaskAttemptState> finalAttemptStates =
|
||||
new HashMap<TaskAttemptID, TaskAttemptState>();
|
||||
finalAttemptStates.put(taId1, TaskAttemptState.SUCCEEDED);
|
||||
finalAttemptStates.put(taId2, TaskAttemptState.FAILED);
|
||||
|
||||
List<EventType> jobHistoryEvents = new ArrayList<EventType>();
|
||||
jobHistoryEvents.add(EventType.TASK_STARTED);
|
||||
jobHistoryEvents.add(EventType.MAP_ATTEMPT_STARTED);
|
||||
jobHistoryEvents.add(EventType.MAP_ATTEMPT_FINISHED);
|
||||
jobHistoryEvents.add(EventType.MAP_ATTEMPT_STARTED);
|
||||
jobHistoryEvents.add(EventType.MAP_ATTEMPT_FAILED);
|
||||
jobHistoryEvents.add(EventType.TASK_FINISHED);
|
||||
recoveryChecker(recoverMapTask, TaskState.SUCCEEDED, finalAttemptStates,
|
||||
arg, jobHistoryEvents, 2L, 1L);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRecoveryAllFailAttempts() {
|
||||
LOG.info("--- START: testRecoveryAllFailAttempts ---");
|
||||
|
||||
long clusterTimestamp = System.currentTimeMillis();
|
||||
EventHandler mockEventHandler = mock(EventHandler.class);
|
||||
MapTaskImpl recoverMapTask = getMockMapTask(clusterTimestamp,
|
||||
mockEventHandler);
|
||||
|
||||
TaskId taskId = recoverMapTask.getID();
|
||||
JobID jobID = new JobID(Long.toString(clusterTimestamp), 1);
|
||||
TaskID taskID = new TaskID(jobID,
|
||||
org.apache.hadoop.mapreduce.TaskType.MAP, taskId.getId());
|
||||
|
||||
//Mock up the TaskAttempts
|
||||
Map<TaskAttemptID, TaskAttemptInfo> mockTaskAttempts =
|
||||
new HashMap<TaskAttemptID, TaskAttemptInfo>();
|
||||
|
||||
TaskAttemptID taId1 = new TaskAttemptID(taskID, 2);
|
||||
TaskAttemptInfo mockTAinfo1 = getMockTaskAttemptInfo(taId1,
|
||||
TaskAttemptState.FAILED);
|
||||
mockTaskAttempts.put(taId1, mockTAinfo1);
|
||||
|
||||
TaskAttemptID taId2 = new TaskAttemptID(taskID, 1);
|
||||
TaskAttemptInfo mockTAinfo2 = getMockTaskAttemptInfo(taId2,
|
||||
TaskAttemptState.FAILED);
|
||||
mockTaskAttempts.put(taId2, mockTAinfo2);
|
||||
|
||||
OutputCommitter mockCommitter = mock (OutputCommitter.class);
|
||||
|
||||
TaskInfo mockTaskInfo = mock(TaskInfo.class);
|
||||
when(mockTaskInfo.getTaskStatus()).thenReturn("FAILED");
|
||||
when(mockTaskInfo.getTaskId()).thenReturn(taskID);
|
||||
when(mockTaskInfo.getAllTaskAttempts()).thenReturn(mockTaskAttempts);
|
||||
|
||||
recoverMapTask.handle(
|
||||
new TaskRecoverEvent(taskId, mockTaskInfo, mockCommitter, true));
|
||||
|
||||
ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
|
||||
verify(mockEventHandler,atLeast(1)).handle(
|
||||
(org.apache.hadoop.yarn.event.Event) arg.capture());
|
||||
|
||||
Map<TaskAttemptID, TaskAttemptState> finalAttemptStates =
|
||||
new HashMap<TaskAttemptID, TaskAttemptState>();
|
||||
finalAttemptStates.put(taId1, TaskAttemptState.FAILED);
|
||||
finalAttemptStates.put(taId2, TaskAttemptState.FAILED);
|
||||
|
||||
List<EventType> jobHistoryEvents = new ArrayList<EventType>();
|
||||
jobHistoryEvents.add(EventType.TASK_STARTED);
|
||||
jobHistoryEvents.add(EventType.MAP_ATTEMPT_STARTED);
|
||||
jobHistoryEvents.add(EventType.MAP_ATTEMPT_FAILED);
|
||||
jobHistoryEvents.add(EventType.MAP_ATTEMPT_STARTED);
|
||||
jobHistoryEvents.add(EventType.MAP_ATTEMPT_FAILED);
|
||||
jobHistoryEvents.add(EventType.TASK_FAILED);
|
||||
recoveryChecker(recoverMapTask, TaskState.FAILED, finalAttemptStates,
|
||||
arg, jobHistoryEvents, 2L, 2L);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRecoveryTaskSuccessAllAttemptsFail() {
|
||||
LOG.info("--- START: testRecoveryTaskSuccessAllAttemptsFail ---");
|
||||
|
||||
long clusterTimestamp = System.currentTimeMillis();
|
||||
EventHandler mockEventHandler = mock(EventHandler.class);
|
||||
MapTaskImpl recoverMapTask = getMockMapTask(clusterTimestamp,
|
||||
mockEventHandler);
|
||||
|
||||
TaskId taskId = recoverMapTask.getID();
|
||||
JobID jobID = new JobID(Long.toString(clusterTimestamp), 1);
|
||||
TaskID taskID = new TaskID(jobID,
|
||||
org.apache.hadoop.mapreduce.TaskType.MAP, taskId.getId());
|
||||
|
||||
//Mock up the TaskAttempts
|
||||
Map<TaskAttemptID, TaskAttemptInfo> mockTaskAttempts =
|
||||
new HashMap<TaskAttemptID, TaskAttemptInfo>();
|
||||
|
||||
TaskAttemptID taId1 = new TaskAttemptID(taskID, 2);
|
||||
TaskAttemptInfo mockTAinfo1 = getMockTaskAttemptInfo(taId1,
|
||||
TaskAttemptState.FAILED);
|
||||
mockTaskAttempts.put(taId1, mockTAinfo1);
|
||||
|
||||
TaskAttemptID taId2 = new TaskAttemptID(taskID, 1);
|
||||
TaskAttemptInfo mockTAinfo2 = getMockTaskAttemptInfo(taId2,
|
||||
TaskAttemptState.FAILED);
|
||||
mockTaskAttempts.put(taId2, mockTAinfo2);
|
||||
|
||||
OutputCommitter mockCommitter = mock (OutputCommitter.class);
|
||||
TaskInfo mockTaskInfo = mock(TaskInfo.class);
|
||||
when(mockTaskInfo.getTaskStatus()).thenReturn("SUCCEEDED");
|
||||
when(mockTaskInfo.getTaskId()).thenReturn(taskID);
|
||||
when(mockTaskInfo.getAllTaskAttempts()).thenReturn(mockTaskAttempts);
|
||||
|
||||
recoverMapTask.handle(
|
||||
new TaskRecoverEvent(taskId, mockTaskInfo, mockCommitter, true));
|
||||
|
||||
ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
|
||||
verify(mockEventHandler,atLeast(1)).handle(
|
||||
(org.apache.hadoop.yarn.event.Event) arg.capture());
|
||||
|
||||
Map<TaskAttemptID, TaskAttemptState> finalAttemptStates =
|
||||
new HashMap<TaskAttemptID, TaskAttemptState>();
|
||||
finalAttemptStates.put(taId1, TaskAttemptState.FAILED);
|
||||
finalAttemptStates.put(taId2, TaskAttemptState.FAILED);
|
||||
// check for one new attempt launched since successful attempt not found
|
||||
TaskAttemptID taId3 = new TaskAttemptID(taskID, 2000);
|
||||
finalAttemptStates.put(taId3, TaskAttemptState.NEW);
|
||||
|
||||
List<EventType> jobHistoryEvents = new ArrayList<EventType>();
|
||||
jobHistoryEvents.add(EventType.TASK_STARTED);
|
||||
jobHistoryEvents.add(EventType.MAP_ATTEMPT_STARTED);
|
||||
jobHistoryEvents.add(EventType.MAP_ATTEMPT_FAILED);
|
||||
jobHistoryEvents.add(EventType.MAP_ATTEMPT_STARTED);
|
||||
jobHistoryEvents.add(EventType.MAP_ATTEMPT_FAILED);
|
||||
recoveryChecker(recoverMapTask, TaskState.RUNNING, finalAttemptStates,
|
||||
arg, jobHistoryEvents, 2L, 2L);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRecoveryTaskSuccessAllAttemptsSucceed() {
|
||||
LOG.info("--- START: testRecoveryTaskSuccessAllAttemptsFail ---");
|
||||
|
||||
long clusterTimestamp = System.currentTimeMillis();
|
||||
EventHandler mockEventHandler = mock(EventHandler.class);
|
||||
MapTaskImpl recoverMapTask = getMockMapTask(clusterTimestamp,
|
||||
mockEventHandler);
|
||||
|
||||
TaskId taskId = recoverMapTask.getID();
|
||||
JobID jobID = new JobID(Long.toString(clusterTimestamp), 1);
|
||||
TaskID taskID = new TaskID(jobID,
|
||||
org.apache.hadoop.mapreduce.TaskType.MAP, taskId.getId());
|
||||
|
||||
//Mock up the TaskAttempts
|
||||
Map<TaskAttemptID, TaskAttemptInfo> mockTaskAttempts =
|
||||
new HashMap<TaskAttemptID, TaskAttemptInfo>();
|
||||
|
||||
TaskAttemptID taId1 = new TaskAttemptID(taskID, 2);
|
||||
TaskAttemptInfo mockTAinfo1 = getMockTaskAttemptInfo(taId1,
|
||||
TaskAttemptState.SUCCEEDED);
|
||||
mockTaskAttempts.put(taId1, mockTAinfo1);
|
||||
|
||||
TaskAttemptID taId2 = new TaskAttemptID(taskID, 1);
|
||||
TaskAttemptInfo mockTAinfo2 = getMockTaskAttemptInfo(taId2,
|
||||
TaskAttemptState.SUCCEEDED);
|
||||
mockTaskAttempts.put(taId2, mockTAinfo2);
|
||||
|
||||
OutputCommitter mockCommitter = mock (OutputCommitter.class);
|
||||
TaskInfo mockTaskInfo = mock(TaskInfo.class);
|
||||
when(mockTaskInfo.getTaskStatus()).thenReturn("SUCCEEDED");
|
||||
when(mockTaskInfo.getTaskId()).thenReturn(taskID);
|
||||
when(mockTaskInfo.getAllTaskAttempts()).thenReturn(mockTaskAttempts);
|
||||
|
||||
recoverMapTask.handle(
|
||||
new TaskRecoverEvent(taskId, mockTaskInfo, mockCommitter, true));
|
||||
|
||||
ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
|
||||
verify(mockEventHandler,atLeast(1)).handle(
|
||||
(org.apache.hadoop.yarn.event.Event) arg.capture());
|
||||
|
||||
Map<TaskAttemptID, TaskAttemptState> finalAttemptStates =
|
||||
new HashMap<TaskAttemptID, TaskAttemptState>();
|
||||
finalAttemptStates.put(taId1, TaskAttemptState.SUCCEEDED);
|
||||
finalAttemptStates.put(taId2, TaskAttemptState.SUCCEEDED);
|
||||
|
||||
List<EventType> jobHistoryEvents = new ArrayList<EventType>();
|
||||
jobHistoryEvents.add(EventType.TASK_STARTED);
|
||||
jobHistoryEvents.add(EventType.MAP_ATTEMPT_STARTED);
|
||||
jobHistoryEvents.add(EventType.MAP_ATTEMPT_FINISHED);
|
||||
jobHistoryEvents.add(EventType.MAP_ATTEMPT_STARTED);
|
||||
jobHistoryEvents.add(EventType.MAP_ATTEMPT_FINISHED);
|
||||
jobHistoryEvents.add(EventType.TASK_FINISHED);
|
||||
recoveryChecker(recoverMapTask, TaskState.SUCCEEDED, finalAttemptStates,
|
||||
arg, jobHistoryEvents, 2L, 0L);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRecoveryAllAttemptsKilled() {
|
||||
LOG.info("--- START: testRecoveryAllAttemptsKilled ---");
|
||||
|
||||
long clusterTimestamp = System.currentTimeMillis();
|
||||
EventHandler mockEventHandler = mock(EventHandler.class);
|
||||
MapTaskImpl recoverMapTask = getMockMapTask(clusterTimestamp,
|
||||
mockEventHandler);
|
||||
|
||||
TaskId taskId = recoverMapTask.getID();
|
||||
JobID jobID = new JobID(Long.toString(clusterTimestamp), 1);
|
||||
TaskID taskID = new TaskID(jobID,
|
||||
org.apache.hadoop.mapreduce.TaskType.MAP, taskId.getId());
|
||||
|
||||
//Mock up the TaskAttempts
|
||||
Map<TaskAttemptID, TaskAttemptInfo> mockTaskAttempts =
|
||||
new HashMap<TaskAttemptID, TaskAttemptInfo>();
|
||||
TaskAttemptID taId1 = new TaskAttemptID(taskID, 2);
|
||||
TaskAttemptInfo mockTAinfo1 = getMockTaskAttemptInfo(taId1,
|
||||
TaskAttemptState.KILLED);
|
||||
mockTaskAttempts.put(taId1, mockTAinfo1);
|
||||
|
||||
TaskAttemptID taId2 = new TaskAttemptID(taskID, 1);
|
||||
TaskAttemptInfo mockTAinfo2 = getMockTaskAttemptInfo(taId2,
|
||||
TaskAttemptState.KILLED);
|
||||
mockTaskAttempts.put(taId2, mockTAinfo2);
|
||||
|
||||
OutputCommitter mockCommitter = mock (OutputCommitter.class);
|
||||
TaskInfo mockTaskInfo = mock(TaskInfo.class);
|
||||
when(mockTaskInfo.getTaskStatus()).thenReturn("KILLED");
|
||||
when(mockTaskInfo.getTaskId()).thenReturn(taskID);
|
||||
when(mockTaskInfo.getAllTaskAttempts()).thenReturn(mockTaskAttempts);
|
||||
|
||||
recoverMapTask.handle(
|
||||
new TaskRecoverEvent(taskId, mockTaskInfo, mockCommitter, true));
|
||||
|
||||
ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
|
||||
verify(mockEventHandler,atLeast(1)).handle(
|
||||
(org.apache.hadoop.yarn.event.Event) arg.capture());
|
||||
|
||||
Map<TaskAttemptID, TaskAttemptState> finalAttemptStates =
|
||||
new HashMap<TaskAttemptID, TaskAttemptState>();
|
||||
finalAttemptStates.put(taId1, TaskAttemptState.KILLED);
|
||||
finalAttemptStates.put(taId2, TaskAttemptState.KILLED);
|
||||
|
||||
List<EventType> jobHistoryEvents = new ArrayList<EventType>();
|
||||
jobHistoryEvents.add(EventType.TASK_STARTED);
|
||||
jobHistoryEvents.add(EventType.MAP_ATTEMPT_STARTED);
|
||||
jobHistoryEvents.add(EventType.MAP_ATTEMPT_KILLED);
|
||||
jobHistoryEvents.add(EventType.MAP_ATTEMPT_STARTED);
|
||||
jobHistoryEvents.add(EventType.MAP_ATTEMPT_KILLED);
|
||||
jobHistoryEvents.add(EventType.TASK_FAILED);
|
||||
recoveryChecker(recoverMapTask, TaskState.KILLED, finalAttemptStates,
|
||||
arg, jobHistoryEvents, 2L, 0L);
|
||||
}
|
||||
|
||||
private void recoveryChecker(MapTaskImpl checkTask, TaskState finalState,
|
||||
Map<TaskAttemptID, TaskAttemptState> finalAttemptStates,
|
||||
ArgumentCaptor<Event> arg, List<EventType> expectedJobHistoryEvents,
|
||||
long expectedMapLaunches, long expectedFailedMaps) {
|
||||
|
||||
assertEquals("Final State of Task", finalState, checkTask.getState());
|
||||
|
||||
Map<TaskAttemptId, TaskAttempt> recoveredAttempts =
|
||||
checkTask.getAttempts();
|
||||
assertEquals("Expected Number of Task Attempts",
|
||||
finalAttemptStates.size(), recoveredAttempts.size());
|
||||
for (TaskAttemptID taID : finalAttemptStates.keySet()) {
|
||||
assertEquals("Expected Task Attempt State",
|
||||
finalAttemptStates.get(taID),
|
||||
recoveredAttempts.get(TypeConverter.toYarn(taID)).getState());
|
||||
}
|
||||
|
||||
Iterator<Event> ie = arg.getAllValues().iterator();
|
||||
int eventNum = 0;
|
||||
long totalLaunchedMaps = 0;
|
||||
long totalFailedMaps = 0;
|
||||
boolean jobTaskEventReceived = false;
|
||||
|
||||
while (ie.hasNext()) {
|
||||
Object current = ie.next();
|
||||
++eventNum;
|
||||
LOG.info(eventNum + " " + current.getClass().getName());
|
||||
if (current instanceof JobHistoryEvent) {
|
||||
JobHistoryEvent jhe = (JobHistoryEvent) current;
|
||||
LOG.info(expectedJobHistoryEvents.get(0).toString() + " " +
|
||||
jhe.getHistoryEvent().getEventType().toString() + " " +
|
||||
jhe.getJobID());
|
||||
assertEquals(expectedJobHistoryEvents.get(0),
|
||||
jhe.getHistoryEvent().getEventType());
|
||||
expectedJobHistoryEvents.remove(0);
|
||||
} else if (current instanceof JobCounterUpdateEvent) {
|
||||
JobCounterUpdateEvent jcue = (JobCounterUpdateEvent) current;
|
||||
|
||||
LOG.info("JobCounterUpdateEvent "
|
||||
+ jcue.getCounterUpdates().get(0).getCounterKey()
|
||||
+ " " + jcue.getCounterUpdates().get(0).getIncrementValue());
|
||||
if (jcue.getCounterUpdates().get(0).getCounterKey() ==
|
||||
JobCounter.NUM_FAILED_MAPS) {
|
||||
totalFailedMaps += jcue.getCounterUpdates().get(0)
|
||||
.getIncrementValue();
|
||||
} else if (jcue.getCounterUpdates().get(0).getCounterKey() ==
|
||||
JobCounter.TOTAL_LAUNCHED_MAPS) {
|
||||
totalLaunchedMaps += jcue.getCounterUpdates().get(0)
|
||||
.getIncrementValue();
|
||||
}
|
||||
} else if (current instanceof JobTaskEvent) {
|
||||
JobTaskEvent jte = (JobTaskEvent) current;
|
||||
assertEquals(jte.getState(), finalState);
|
||||
jobTaskEventReceived = true;
|
||||
}
|
||||
}
|
||||
assertTrue(jobTaskEventReceived || (finalState == TaskState.RUNNING));
|
||||
assertEquals("Did not process all expected JobHistoryEvents",
|
||||
0, expectedJobHistoryEvents.size());
|
||||
assertEquals("Expected Map Launches",
|
||||
expectedMapLaunches, totalLaunchedMaps);
|
||||
assertEquals("Expected Failed Maps",
|
||||
expectedFailedMaps, totalFailedMaps);
|
||||
}
|
||||
|
||||
private MapTaskImpl getMockMapTask(long clusterTimestamp, EventHandler eh) {
|
||||
|
||||
ApplicationId appId = BuilderUtils.newApplicationId(clusterTimestamp, 1);
|
||||
JobId jobId = MRBuilderUtils.newJobId(appId, 1);
|
||||
|
||||
int partitions = 2;
|
||||
|
||||
Path remoteJobConfFile = mock(Path.class);
|
||||
JobConf conf = new JobConf();
|
||||
TaskAttemptListener taskAttemptListener = mock(TaskAttemptListener.class);
|
||||
Token<JobTokenIdentifier> jobToken =
|
||||
(Token<JobTokenIdentifier>) mock(Token.class);
|
||||
Credentials credentials = null;
|
||||
Clock clock = new SystemClock();
|
||||
int appAttemptId = 3;
|
||||
MRAppMetrics metrics = mock(MRAppMetrics.class);
|
||||
Resource minContainerRequirements = mock(Resource.class);
|
||||
when(minContainerRequirements.getMemory()).thenReturn(1000);
|
||||
|
||||
ClusterInfo clusterInfo = mock(ClusterInfo.class);
|
||||
when(clusterInfo.getMinContainerCapability()).thenReturn(
|
||||
minContainerRequirements);
|
||||
AppContext appContext = mock(AppContext.class);
|
||||
when(appContext.getClusterInfo()).thenReturn(clusterInfo);
|
||||
|
||||
TaskSplitMetaInfo taskSplitMetaInfo = mock(TaskSplitMetaInfo.class);
|
||||
MapTaskImpl mapTask = new MapTaskImpl(jobId, partitions,
|
||||
eh, remoteJobConfFile, conf,
|
||||
taskSplitMetaInfo, taskAttemptListener, jobToken, credentials, clock,
|
||||
appAttemptId, metrics, appContext);
|
||||
return mapTask;
|
||||
}
|
||||
|
||||
private TaskAttemptInfo getMockTaskAttemptInfo(TaskAttemptID tai,
|
||||
TaskAttemptState tas) {
|
||||
|
||||
ContainerId ci = mock(ContainerId.class);
|
||||
Counters counters = mock(Counters.class);
|
||||
TaskType tt = TaskType.MAP;
|
||||
|
||||
long finishTime = System.currentTimeMillis();
|
||||
|
||||
TaskAttemptInfo mockTAinfo = mock(TaskAttemptInfo.class);
|
||||
|
||||
when(mockTAinfo.getAttemptId()).thenReturn(tai);
|
||||
when(mockTAinfo.getContainerId()).thenReturn(ci);
|
||||
when(mockTAinfo.getCounters()).thenReturn(counters);
|
||||
when(mockTAinfo.getError()).thenReturn("");
|
||||
when(mockTAinfo.getFinishTime()).thenReturn(finishTime);
|
||||
when(mockTAinfo.getHostname()).thenReturn("localhost");
|
||||
when(mockTAinfo.getHttpPort()).thenReturn(23);
|
||||
when(mockTAinfo.getMapFinishTime()).thenReturn(finishTime - 1000L);
|
||||
when(mockTAinfo.getPort()).thenReturn(24);
|
||||
when(mockTAinfo.getRackname()).thenReturn("defaultRack");
|
||||
when(mockTAinfo.getShuffleFinishTime()).thenReturn(finishTime - 2000L);
|
||||
when(mockTAinfo.getShufflePort()).thenReturn(25);
|
||||
when(mockTAinfo.getSortFinishTime()).thenReturn(finishTime - 3000L);
|
||||
when(mockTAinfo.getStartTime()).thenReturn(finishTime -10000);
|
||||
when(mockTAinfo.getState()).thenReturn("task in progress");
|
||||
when(mockTAinfo.getTaskStatus()).thenReturn(tas.toString());
|
||||
when(mockTAinfo.getTaskType()).thenReturn(tt);
|
||||
when(mockTAinfo.getTrackerName()).thenReturn("TrackerName");
|
||||
return mockTAinfo;
|
||||
}
|
||||
|
||||
private void writeBadOutput(TaskAttempt attempt, Configuration conf)
|
||||
throws Exception {
|
||||
TaskAttemptContext tContext = new TaskAttemptContextImpl(conf,
|
||||
|
@ -1145,5 +1604,16 @@ public class TestRecovery {
|
|||
public static void main(String[] arg) throws Exception {
|
||||
TestRecovery test = new TestRecovery();
|
||||
test.testCrashed();
|
||||
test.testMultipleCrashes();
|
||||
test.testOutputRecovery();
|
||||
test.testOutputRecoveryMapsOnly();
|
||||
test.testRecoveryWithOldCommiter();
|
||||
test.testSpeculative();
|
||||
test.testRecoveryWithoutShuffleSecret();
|
||||
test.testRecoverySuccessAttempt();
|
||||
test.testRecoveryAllFailAttempts();
|
||||
test.testRecoveryTaskSuccessAllAttemptsFail();
|
||||
test.testRecoveryTaskSuccessAllAttemptsSucceed();
|
||||
test.testRecoveryAllAttemptsKilled();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -316,7 +316,8 @@ import org.junit.Test;
|
|||
Job newJob = new TestJob(getJobId(), getAttemptID(), conf,
|
||||
getDispatcher().getEventHandler(),
|
||||
getTaskAttemptListener(), getContext().getClock(),
|
||||
isNewApiCommitter(), currentUser.getUserName(), getContext(),
|
||||
getCommitter(), isNewApiCommitter(),
|
||||
currentUser.getUserName(), getContext(),
|
||||
forcedState, diagnostic);
|
||||
((AppContext) getContext()).getAllJobs().put(newJob.getID(), newJob);
|
||||
|
||||
|
|
|
@ -25,6 +25,7 @@ import static org.mockito.Mockito.when;
|
|||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
import java.util.EnumSet;
|
||||
import java.util.concurrent.BrokenBarrierException;
|
||||
import java.util.concurrent.CyclicBarrier;
|
||||
|
@ -35,6 +36,7 @@ import org.apache.hadoop.mapreduce.JobACL;
|
|||
import org.apache.hadoop.mapreduce.JobContext;
|
||||
import org.apache.hadoop.mapreduce.jobhistory.EventType;
|
||||
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
|
||||
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
|
||||
import org.apache.hadoop.mapreduce.jobhistory.JobSubmittedEvent;
|
||||
import org.apache.hadoop.mapreduce.JobID;
|
||||
import org.apache.hadoop.mapreduce.JobStatus.State;
|
||||
|
@ -47,6 +49,7 @@ import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
|
|||
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
|
||||
import org.apache.hadoop.mapreduce.v2.app.AppContext;
|
||||
|
@ -57,6 +60,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.JobDiagnosticsUpdateEvent;
|
|||
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.event.JobFinishEvent;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.event.JobStartEvent;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskEvent;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl.InitTransition;
|
||||
|
@ -69,7 +73,6 @@ import org.apache.hadoop.yarn.SystemClock;
|
|||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.event.AsyncDispatcher;
|
||||
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||
import org.apache.hadoop.yarn.event.Event;
|
||||
import org.apache.hadoop.yarn.event.EventHandler;
|
||||
import org.apache.hadoop.yarn.state.StateMachine;
|
||||
import org.apache.hadoop.yarn.state.StateMachineFactory;
|
||||
|
@ -133,7 +136,7 @@ public class TestJobImpl {
|
|||
JobImpl job = createStubbedJob(conf, dispatcher, 0);
|
||||
job.handle(new JobEvent(job.getID(), JobEventType.JOB_INIT));
|
||||
assertJobState(job, JobStateInternal.INITED);
|
||||
job.handle(new JobEvent(job.getID(), JobEventType.JOB_START));
|
||||
job.handle(new JobStartEvent(job.getID()));
|
||||
assertJobState(job, JobStateInternal.SUCCEEDED);
|
||||
dispatcher.stop();
|
||||
commitHandler.stop();
|
||||
|
@ -222,7 +225,7 @@ public class TestJobImpl {
|
|||
JobId jobId = job.getID();
|
||||
job.handle(new JobEvent(jobId, JobEventType.JOB_INIT));
|
||||
assertJobState(job, JobStateInternal.INITED);
|
||||
job.handle(new JobEvent(jobId, JobEventType.JOB_START));
|
||||
job.handle(new JobStartEvent(jobId));
|
||||
assertJobState(job, JobStateInternal.SETUP);
|
||||
|
||||
job.handle(new JobEvent(job.getID(), JobEventType.JOB_AM_REBOOT));
|
||||
|
@ -284,7 +287,7 @@ public class TestJobImpl {
|
|||
JobId jobId = job.getID();
|
||||
job.handle(new JobEvent(jobId, JobEventType.JOB_INIT));
|
||||
assertJobState(job, JobStateInternal.INITED);
|
||||
job.handle(new JobEvent(jobId, JobEventType.JOB_START));
|
||||
job.handle(new JobStartEvent(jobId));
|
||||
assertJobState(job, JobStateInternal.SETUP);
|
||||
|
||||
job.handle(new JobEvent(job.getID(), JobEventType.JOB_KILL));
|
||||
|
@ -351,7 +354,7 @@ public class TestJobImpl {
|
|||
JobId jobId = job.getID();
|
||||
job.handle(new JobEvent(jobId, JobEventType.JOB_INIT));
|
||||
assertJobState(job, JobStateInternal.INITED);
|
||||
job.handle(new JobEvent(jobId, JobEventType.JOB_START));
|
||||
job.handle(new JobStartEvent(jobId));
|
||||
assertJobState(job, JobStateInternal.FAIL_ABORT);
|
||||
|
||||
job.handle(new JobEvent(jobId, JobEventType.JOB_KILL));
|
||||
|
@ -388,7 +391,7 @@ public class TestJobImpl {
|
|||
JobId jobId = job.getID();
|
||||
job.handle(new JobEvent(jobId, JobEventType.JOB_INIT));
|
||||
assertJobState(job, JobStateInternal.INITED);
|
||||
job.handle(new JobEvent(jobId, JobEventType.JOB_START));
|
||||
job.handle(new JobStartEvent(jobId));
|
||||
assertJobState(job, JobStateInternal.SETUP);
|
||||
|
||||
job.handle(new JobEvent(jobId, JobEventType.JOB_KILL));
|
||||
|
@ -428,7 +431,7 @@ public class TestJobImpl {
|
|||
|
||||
// Verify access
|
||||
JobImpl job1 = new JobImpl(jobId, null, conf1, null, null, null, null, null,
|
||||
null, null, true, null, 0, null, null, null, null);
|
||||
null, null, null, true, null, 0, null, null, null, null);
|
||||
Assert.assertTrue(job1.checkAccess(ugi1, JobACL.VIEW_JOB));
|
||||
Assert.assertFalse(job1.checkAccess(ugi2, JobACL.VIEW_JOB));
|
||||
|
||||
|
@ -439,7 +442,7 @@ public class TestJobImpl {
|
|||
|
||||
// Verify access
|
||||
JobImpl job2 = new JobImpl(jobId, null, conf2, null, null, null, null, null,
|
||||
null, null, true, null, 0, null, null, null, null);
|
||||
null, null, null, true, null, 0, null, null, null, null);
|
||||
Assert.assertTrue(job2.checkAccess(ugi1, JobACL.VIEW_JOB));
|
||||
Assert.assertTrue(job2.checkAccess(ugi2, JobACL.VIEW_JOB));
|
||||
|
||||
|
@ -450,7 +453,7 @@ public class TestJobImpl {
|
|||
|
||||
// Verify access
|
||||
JobImpl job3 = new JobImpl(jobId, null, conf3, null, null, null, null, null,
|
||||
null, null, true, null, 0, null, null, null, null);
|
||||
null, null, null, true, null, 0, null, null, null, null);
|
||||
Assert.assertTrue(job3.checkAccess(ugi1, JobACL.VIEW_JOB));
|
||||
Assert.assertTrue(job3.checkAccess(ugi2, JobACL.VIEW_JOB));
|
||||
|
||||
|
@ -461,7 +464,7 @@ public class TestJobImpl {
|
|||
|
||||
// Verify access
|
||||
JobImpl job4 = new JobImpl(jobId, null, conf4, null, null, null, null, null,
|
||||
null, null, true, null, 0, null, null, null, null);
|
||||
null, null, null, true, null, 0, null, null, null, null);
|
||||
Assert.assertTrue(job4.checkAccess(ugi1, JobACL.VIEW_JOB));
|
||||
Assert.assertTrue(job4.checkAccess(ugi2, JobACL.VIEW_JOB));
|
||||
|
||||
|
@ -472,7 +475,7 @@ public class TestJobImpl {
|
|||
|
||||
// Verify access
|
||||
JobImpl job5 = new JobImpl(jobId, null, conf5, null, null, null, null, null,
|
||||
null, null, true, null, 0, null, null, null, null);
|
||||
null, null, null, true, null, 0, null, null, null, null);
|
||||
Assert.assertTrue(job5.checkAccess(ugi1, null));
|
||||
Assert.assertTrue(job5.checkAccess(ugi2, null));
|
||||
}
|
||||
|
@ -490,7 +493,7 @@ public class TestJobImpl {
|
|||
mock(EventHandler.class),
|
||||
null, mock(JobTokenSecretManager.class), null,
|
||||
new SystemClock(), null,
|
||||
mrAppMetrics, true, null, 0, null, null, null, null);
|
||||
mrAppMetrics, null, true, null, 0, null, null, null, null);
|
||||
job.handle(diagUpdateEvent);
|
||||
String diagnostics = job.getReport().getDiagnostics();
|
||||
Assert.assertNotNull(diagnostics);
|
||||
|
@ -501,7 +504,7 @@ public class TestJobImpl {
|
|||
mock(EventHandler.class),
|
||||
null, mock(JobTokenSecretManager.class), null,
|
||||
new SystemClock(), null,
|
||||
mrAppMetrics, true, null, 0, null, null, null, null);
|
||||
mrAppMetrics, null, true, null, 0, null, null, null, null);
|
||||
job.handle(new JobEvent(jobId, JobEventType.JOB_KILL));
|
||||
job.handle(diagUpdateEvent);
|
||||
diagnostics = job.getReport().getDiagnostics();
|
||||
|
@ -556,7 +559,7 @@ public class TestJobImpl {
|
|||
JobImpl job = new JobImpl(jobId, Records
|
||||
.newRecord(ApplicationAttemptId.class), conf, mock(EventHandler.class),
|
||||
null, new JobTokenSecretManager(), new Credentials(), null, null,
|
||||
mrAppMetrics, true, null, 0, null, null, null, null);
|
||||
mrAppMetrics, null, true, null, 0, null, null, null, null);
|
||||
InitTransition initTransition = getInitTransition(2);
|
||||
JobEvent mockJobEvent = mock(JobEvent.class);
|
||||
initTransition.transition(job, mockJobEvent);
|
||||
|
@ -597,7 +600,7 @@ public class TestJobImpl {
|
|||
JobId jobId = job.getID();
|
||||
job.handle(new JobEvent(jobId, JobEventType.JOB_INIT));
|
||||
assertJobState(job, JobStateInternal.INITED);
|
||||
job.handle(new JobEvent(jobId, JobEventType.JOB_START));
|
||||
job.handle(new JobStartEvent(jobId));
|
||||
assertJobState(job, JobStateInternal.FAILED);
|
||||
|
||||
job.handle(new JobEvent(jobId, JobEventType.JOB_TASK_COMPLETED));
|
||||
|
@ -661,7 +664,7 @@ public class TestJobImpl {
|
|||
StubbedJob job = createStubbedJob(conf, dispatcher, numSplits);
|
||||
job.handle(new JobEvent(job.getID(), JobEventType.JOB_INIT));
|
||||
assertJobState(job, JobStateInternal.INITED);
|
||||
job.handle(new JobEvent(job.getID(), JobEventType.JOB_START));
|
||||
job.handle(new JobStartEvent(job.getID()));
|
||||
assertJobState(job, JobStateInternal.RUNNING);
|
||||
return job;
|
||||
}
|
||||
|
@ -785,9 +788,9 @@ public class TestJobImpl {
|
|||
boolean newApiCommitter, String user, int numSplits) {
|
||||
super(jobId, applicationAttemptId, conf, eventHandler,
|
||||
null, new JobTokenSecretManager(), new Credentials(),
|
||||
new SystemClock(), null, MRAppMetrics.create(),
|
||||
newApiCommitter, user, System.currentTimeMillis(), null, null, null,
|
||||
null);
|
||||
new SystemClock(), Collections.<TaskId, TaskInfo> emptyMap(),
|
||||
MRAppMetrics.create(), null, newApiCommitter, user,
|
||||
System.currentTimeMillis(), null, null, null, null);
|
||||
|
||||
initTransition = getInitTransition(numSplits);
|
||||
localFactory = stateMachineFactory.addTransition(JobStateInternal.NEW,
|
||||
|
|
|
@ -27,7 +27,6 @@ import static org.mockito.Mockito.when;
|
|||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
@ -38,7 +37,6 @@ import org.apache.hadoop.mapred.TaskUmbilicalProtocol;
|
|||
import org.apache.hadoop.mapreduce.Counter;
|
||||
import org.apache.hadoop.mapreduce.Counters;
|
||||
import org.apache.hadoop.mapreduce.TaskCounter;
|
||||
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
|
||||
import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
|
||||
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.Avataar;
|
||||
|
@ -80,7 +78,6 @@ public class TestTaskImpl {
|
|||
private Path remoteJobConfFile;
|
||||
private Credentials credentials;
|
||||
private Clock clock;
|
||||
private Map<TaskId, TaskInfo> completedTasksFromPreviousRun;
|
||||
private MRAppMetrics metrics;
|
||||
private TaskImpl mockTask;
|
||||
private ApplicationId appId;
|
||||
|
@ -104,13 +101,12 @@ public class TestTaskImpl {
|
|||
EventHandler eventHandler, Path remoteJobConfFile, JobConf conf,
|
||||
TaskAttemptListener taskAttemptListener,
|
||||
Token<JobTokenIdentifier> jobToken,
|
||||
Credentials credentials, Clock clock,
|
||||
Map<TaskId, TaskInfo> completedTasksFromPreviousRun, int startCount,
|
||||
Credentials credentials, Clock clock, int startCount,
|
||||
MRAppMetrics metrics, AppContext appContext, TaskType taskType) {
|
||||
super(jobId, taskType , partition, eventHandler,
|
||||
remoteJobConfFile, conf, taskAttemptListener,
|
||||
jobToken, credentials, clock,
|
||||
completedTasksFromPreviousRun, startCount, metrics, appContext);
|
||||
startCount, metrics, appContext);
|
||||
this.taskType = taskType;
|
||||
}
|
||||
|
||||
|
@ -247,8 +243,7 @@ public class TestTaskImpl {
|
|||
return new MockTaskImpl(jobId, partition, dispatcher.getEventHandler(),
|
||||
remoteJobConfFile, conf, taskAttemptListener, jobToken,
|
||||
credentials, clock,
|
||||
completedTasksFromPreviousRun, startCount,
|
||||
metrics, appContext, taskType);
|
||||
startCount, metrics, appContext, taskType);
|
||||
}
|
||||
|
||||
@After
|
||||
|
@ -652,9 +647,7 @@ public class TestTaskImpl {
|
|||
public void testFailedTransitions() {
|
||||
mockTask = new MockTaskImpl(jobId, partition, dispatcher.getEventHandler(),
|
||||
remoteJobConfFile, conf, taskAttemptListener, jobToken,
|
||||
credentials, clock,
|
||||
completedTasksFromPreviousRun, startCount,
|
||||
metrics, appContext, TaskType.MAP) {
|
||||
credentials, clock, startCount, metrics, appContext, TaskType.MAP) {
|
||||
@Override
|
||||
protected int getMaxAttempts() {
|
||||
return 1;
|
||||
|
@ -721,9 +714,7 @@ public class TestTaskImpl {
|
|||
public void testCountersWithSpeculation() {
|
||||
mockTask = new MockTaskImpl(jobId, partition, dispatcher.getEventHandler(),
|
||||
remoteJobConfFile, conf, taskAttemptListener, jobToken,
|
||||
credentials, clock,
|
||||
completedTasksFromPreviousRun, startCount,
|
||||
metrics, appContext, TaskType.MAP) {
|
||||
credentials, clock, startCount, metrics, appContext, TaskType.MAP) {
|
||||
@Override
|
||||
protected int getMaxAttempts() {
|
||||
return 1;
|
||||
|
|
|
@ -47,6 +47,7 @@ import org.apache.hadoop.mapreduce.v2.api.records.JobId;
|
|||
import org.apache.hadoop.mapreduce.v2.util.MRApps;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
|
||||
import com.google.common.base.Joiner;
|
||||
|
@ -525,4 +526,19 @@ public class JobHistoryUtils {
|
|||
sb.append(jobId.toString());
|
||||
return sb.toString();
|
||||
}
|
||||
|
||||
public static Path getPreviousJobHistoryPath(
|
||||
Configuration conf, ApplicationAttemptId applicationAttemptId)
|
||||
throws IOException {
|
||||
String jobId =
|
||||
TypeConverter.fromYarn(applicationAttemptId.getApplicationId())
|
||||
.toString();
|
||||
String jobhistoryDir =
|
||||
JobHistoryUtils.getConfiguredHistoryStagingDirPrefix(conf, jobId);
|
||||
Path histDirPath = FileContext.getFileContext(conf).makeQualified(
|
||||
new Path(jobhistoryDir));
|
||||
FileContext fc = FileContext.getFileContext(histDirPath.toUri(), conf);
|
||||
return fc.makeQualified(JobHistoryUtils.getStagingJobHistoryFile(
|
||||
histDirPath,jobId, (applicationAttemptId.getAttemptId() - 1)));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -422,6 +422,7 @@ public interface MRJobConfig {
|
|||
/** Enable job recovery.*/
|
||||
public static final String MR_AM_JOB_RECOVERY_ENABLE =
|
||||
MR_AM_PREFIX + "job.recovery.enable";
|
||||
public static final boolean MR_AM_JOB_RECOVERY_ENABLE_DEFAULT = true;
|
||||
|
||||
/**
|
||||
* Limit on the number of reducers that can be preempted to ensure that at
|
||||
|
|
Loading…
Reference in New Issue