MAPREDUCE-4596. Split StateMachine state from states seen by MRClientProtocol for Job, Task and TaskAttempt. Contributed by Siddarth Seth.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1399976 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2012-10-19 05:57:57 +00:00
parent a4f30ff53c
commit 022f7b4a25
21 changed files with 570 additions and 365 deletions

View File

@ -570,6 +570,9 @@ Release 0.23.5 - UNRELEASED
IMPROVEMENTS IMPROVEMENTS
MAPREDUCE-4596. Split StateMachine state from states seen by MRClientProtocol
for Job, Task and TaskAttempt. (Siddarth Seth via vinodkv)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -0,0 +1,30 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.v2.app.job;
public enum JobStateInternal {
NEW,
INITED,
RUNNING,
SUCCEEDED,
FAILED,
KILL_WAIT,
KILLED,
ERROR
}

View File

@ -0,0 +1,42 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.v2.app.job;
import org.apache.hadoop.classification.InterfaceAudience.Private;
/**
* TaskAttemptImpl internal state machine states.
*
*/
@Private
public enum TaskAttemptStateInternal {
NEW,
UNASSIGNED,
ASSIGNED,
RUNNING,
COMMIT_PENDING,
SUCCESS_CONTAINER_CLEANUP,
SUCCEEDED,
FAIL_CONTAINER_CLEANUP,
FAIL_TASK_CLEANUP,
FAILED,
KILL_CONTAINER_CLEANUP,
KILL_TASK_CLEANUP,
KILLED,
}

View File

@ -0,0 +1,23 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.v2.app.job;
public enum TaskStateInternal {
NEW, SCHEDULED, RUNNING, SUCCEEDED, FAILED, KILL_WAIT, KILLED
}

View File

@ -76,6 +76,7 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType; import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.AppContext; import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener; import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
import org.apache.hadoop.mapreduce.v2.app.job.Task; import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt; import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobCounterUpdateEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.JobCounterUpdateEvent;
@ -210,163 +211,163 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
new UpdatedNodesTransition(); new UpdatedNodesTransition();
protected static final protected static final
StateMachineFactory<JobImpl, JobState, JobEventType, JobEvent> StateMachineFactory<JobImpl, JobStateInternal, JobEventType, JobEvent>
stateMachineFactory stateMachineFactory
= new StateMachineFactory<JobImpl, JobState, JobEventType, JobEvent> = new StateMachineFactory<JobImpl, JobStateInternal, JobEventType, JobEvent>
(JobState.NEW) (JobStateInternal.NEW)
// Transitions from NEW state // Transitions from NEW state
.addTransition(JobState.NEW, JobState.NEW, .addTransition(JobStateInternal.NEW, JobStateInternal.NEW,
JobEventType.JOB_DIAGNOSTIC_UPDATE, JobEventType.JOB_DIAGNOSTIC_UPDATE,
DIAGNOSTIC_UPDATE_TRANSITION) DIAGNOSTIC_UPDATE_TRANSITION)
.addTransition(JobState.NEW, JobState.NEW, .addTransition(JobStateInternal.NEW, JobStateInternal.NEW,
JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION) JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
.addTransition .addTransition
(JobState.NEW, (JobStateInternal.NEW,
EnumSet.of(JobState.INITED, JobState.FAILED), EnumSet.of(JobStateInternal.INITED, JobStateInternal.FAILED),
JobEventType.JOB_INIT, JobEventType.JOB_INIT,
new InitTransition()) new InitTransition())
.addTransition(JobState.NEW, JobState.KILLED, .addTransition(JobStateInternal.NEW, JobStateInternal.KILLED,
JobEventType.JOB_KILL, JobEventType.JOB_KILL,
new KillNewJobTransition()) new KillNewJobTransition())
.addTransition(JobState.NEW, JobState.ERROR, .addTransition(JobStateInternal.NEW, JobStateInternal.ERROR,
JobEventType.INTERNAL_ERROR, JobEventType.INTERNAL_ERROR,
INTERNAL_ERROR_TRANSITION) INTERNAL_ERROR_TRANSITION)
// Ignore-able events // Ignore-able events
.addTransition(JobState.NEW, JobState.NEW, .addTransition(JobStateInternal.NEW, JobStateInternal.NEW,
JobEventType.JOB_UPDATED_NODES) JobEventType.JOB_UPDATED_NODES)
// Transitions from INITED state // Transitions from INITED state
.addTransition(JobState.INITED, JobState.INITED, .addTransition(JobStateInternal.INITED, JobStateInternal.INITED,
JobEventType.JOB_DIAGNOSTIC_UPDATE, JobEventType.JOB_DIAGNOSTIC_UPDATE,
DIAGNOSTIC_UPDATE_TRANSITION) DIAGNOSTIC_UPDATE_TRANSITION)
.addTransition(JobState.INITED, JobState.INITED, .addTransition(JobStateInternal.INITED, JobStateInternal.INITED,
JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION) JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
.addTransition(JobState.INITED, JobState.RUNNING, .addTransition(JobStateInternal.INITED, JobStateInternal.RUNNING,
JobEventType.JOB_START, JobEventType.JOB_START,
new StartTransition()) new StartTransition())
.addTransition(JobState.INITED, JobState.KILLED, .addTransition(JobStateInternal.INITED, JobStateInternal.KILLED,
JobEventType.JOB_KILL, JobEventType.JOB_KILL,
new KillInitedJobTransition()) new KillInitedJobTransition())
.addTransition(JobState.INITED, JobState.ERROR, .addTransition(JobStateInternal.INITED, JobStateInternal.ERROR,
JobEventType.INTERNAL_ERROR, JobEventType.INTERNAL_ERROR,
INTERNAL_ERROR_TRANSITION) INTERNAL_ERROR_TRANSITION)
// Ignore-able events // Ignore-able events
.addTransition(JobState.INITED, JobState.INITED, .addTransition(JobStateInternal.INITED, JobStateInternal.INITED,
JobEventType.JOB_UPDATED_NODES) JobEventType.JOB_UPDATED_NODES)
// Transitions from RUNNING state // Transitions from RUNNING state
.addTransition(JobState.RUNNING, JobState.RUNNING, .addTransition(JobStateInternal.RUNNING, JobStateInternal.RUNNING,
JobEventType.JOB_TASK_ATTEMPT_COMPLETED, JobEventType.JOB_TASK_ATTEMPT_COMPLETED,
TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION) TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION)
.addTransition .addTransition
(JobState.RUNNING, (JobStateInternal.RUNNING,
EnumSet.of(JobState.RUNNING, JobState.SUCCEEDED, JobState.FAILED), EnumSet.of(JobStateInternal.RUNNING, JobStateInternal.SUCCEEDED, JobStateInternal.FAILED),
JobEventType.JOB_TASK_COMPLETED, JobEventType.JOB_TASK_COMPLETED,
new TaskCompletedTransition()) new TaskCompletedTransition())
.addTransition .addTransition
(JobState.RUNNING, (JobStateInternal.RUNNING,
EnumSet.of(JobState.RUNNING, JobState.SUCCEEDED, JobState.FAILED), EnumSet.of(JobStateInternal.RUNNING, JobStateInternal.SUCCEEDED, JobStateInternal.FAILED),
JobEventType.JOB_COMPLETED, JobEventType.JOB_COMPLETED,
new JobNoTasksCompletedTransition()) new JobNoTasksCompletedTransition())
.addTransition(JobState.RUNNING, JobState.KILL_WAIT, .addTransition(JobStateInternal.RUNNING, JobStateInternal.KILL_WAIT,
JobEventType.JOB_KILL, new KillTasksTransition()) JobEventType.JOB_KILL, new KillTasksTransition())
.addTransition(JobState.RUNNING, JobState.RUNNING, .addTransition(JobStateInternal.RUNNING, JobStateInternal.RUNNING,
JobEventType.JOB_UPDATED_NODES, JobEventType.JOB_UPDATED_NODES,
UPDATED_NODES_TRANSITION) UPDATED_NODES_TRANSITION)
.addTransition(JobState.RUNNING, JobState.RUNNING, .addTransition(JobStateInternal.RUNNING, JobStateInternal.RUNNING,
JobEventType.JOB_MAP_TASK_RESCHEDULED, JobEventType.JOB_MAP_TASK_RESCHEDULED,
new MapTaskRescheduledTransition()) new MapTaskRescheduledTransition())
.addTransition(JobState.RUNNING, JobState.RUNNING, .addTransition(JobStateInternal.RUNNING, JobStateInternal.RUNNING,
JobEventType.JOB_DIAGNOSTIC_UPDATE, JobEventType.JOB_DIAGNOSTIC_UPDATE,
DIAGNOSTIC_UPDATE_TRANSITION) DIAGNOSTIC_UPDATE_TRANSITION)
.addTransition(JobState.RUNNING, JobState.RUNNING, .addTransition(JobStateInternal.RUNNING, JobStateInternal.RUNNING,
JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION) JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
.addTransition(JobState.RUNNING, JobState.RUNNING, .addTransition(JobStateInternal.RUNNING, JobStateInternal.RUNNING,
JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE, JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE,
new TaskAttemptFetchFailureTransition()) new TaskAttemptFetchFailureTransition())
.addTransition( .addTransition(
JobState.RUNNING, JobStateInternal.RUNNING,
JobState.ERROR, JobEventType.INTERNAL_ERROR, JobStateInternal.ERROR, JobEventType.INTERNAL_ERROR,
INTERNAL_ERROR_TRANSITION) INTERNAL_ERROR_TRANSITION)
// Transitions from KILL_WAIT state. // Transitions from KILL_WAIT state.
.addTransition .addTransition
(JobState.KILL_WAIT, (JobStateInternal.KILL_WAIT,
EnumSet.of(JobState.KILL_WAIT, JobState.KILLED), EnumSet.of(JobStateInternal.KILL_WAIT, JobStateInternal.KILLED),
JobEventType.JOB_TASK_COMPLETED, JobEventType.JOB_TASK_COMPLETED,
new KillWaitTaskCompletedTransition()) new KillWaitTaskCompletedTransition())
.addTransition(JobState.KILL_WAIT, JobState.KILL_WAIT, .addTransition(JobStateInternal.KILL_WAIT, JobStateInternal.KILL_WAIT,
JobEventType.JOB_TASK_ATTEMPT_COMPLETED, JobEventType.JOB_TASK_ATTEMPT_COMPLETED,
TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION) TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION)
.addTransition(JobState.KILL_WAIT, JobState.KILL_WAIT, .addTransition(JobStateInternal.KILL_WAIT, JobStateInternal.KILL_WAIT,
JobEventType.JOB_DIAGNOSTIC_UPDATE, JobEventType.JOB_DIAGNOSTIC_UPDATE,
DIAGNOSTIC_UPDATE_TRANSITION) DIAGNOSTIC_UPDATE_TRANSITION)
.addTransition(JobState.KILL_WAIT, JobState.KILL_WAIT, .addTransition(JobStateInternal.KILL_WAIT, JobStateInternal.KILL_WAIT,
JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION) JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
.addTransition( .addTransition(
JobState.KILL_WAIT, JobStateInternal.KILL_WAIT,
JobState.ERROR, JobEventType.INTERNAL_ERROR, JobStateInternal.ERROR, JobEventType.INTERNAL_ERROR,
INTERNAL_ERROR_TRANSITION) INTERNAL_ERROR_TRANSITION)
// Ignore-able events // Ignore-able events
.addTransition(JobState.KILL_WAIT, JobState.KILL_WAIT, .addTransition(JobStateInternal.KILL_WAIT, JobStateInternal.KILL_WAIT,
EnumSet.of(JobEventType.JOB_KILL, EnumSet.of(JobEventType.JOB_KILL,
JobEventType.JOB_UPDATED_NODES, JobEventType.JOB_UPDATED_NODES,
JobEventType.JOB_MAP_TASK_RESCHEDULED, JobEventType.JOB_MAP_TASK_RESCHEDULED,
JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE)) JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE))
// Transitions from SUCCEEDED state // Transitions from SUCCEEDED state
.addTransition(JobState.SUCCEEDED, JobState.SUCCEEDED, .addTransition(JobStateInternal.SUCCEEDED, JobStateInternal.SUCCEEDED,
JobEventType.JOB_DIAGNOSTIC_UPDATE, JobEventType.JOB_DIAGNOSTIC_UPDATE,
DIAGNOSTIC_UPDATE_TRANSITION) DIAGNOSTIC_UPDATE_TRANSITION)
.addTransition(JobState.SUCCEEDED, JobState.SUCCEEDED, .addTransition(JobStateInternal.SUCCEEDED, JobStateInternal.SUCCEEDED,
JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION) JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
.addTransition( .addTransition(
JobState.SUCCEEDED, JobStateInternal.SUCCEEDED,
JobState.ERROR, JobEventType.INTERNAL_ERROR, JobStateInternal.ERROR, JobEventType.INTERNAL_ERROR,
INTERNAL_ERROR_TRANSITION) INTERNAL_ERROR_TRANSITION)
// Ignore-able events // Ignore-able events
.addTransition(JobState.SUCCEEDED, JobState.SUCCEEDED, .addTransition(JobStateInternal.SUCCEEDED, JobStateInternal.SUCCEEDED,
EnumSet.of(JobEventType.JOB_KILL, EnumSet.of(JobEventType.JOB_KILL,
JobEventType.JOB_UPDATED_NODES, JobEventType.JOB_UPDATED_NODES,
JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE)) JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE))
// Transitions from FAILED state // Transitions from FAILED state
.addTransition(JobState.FAILED, JobState.FAILED, .addTransition(JobStateInternal.FAILED, JobStateInternal.FAILED,
JobEventType.JOB_DIAGNOSTIC_UPDATE, JobEventType.JOB_DIAGNOSTIC_UPDATE,
DIAGNOSTIC_UPDATE_TRANSITION) DIAGNOSTIC_UPDATE_TRANSITION)
.addTransition(JobState.FAILED, JobState.FAILED, .addTransition(JobStateInternal.FAILED, JobStateInternal.FAILED,
JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION) JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
.addTransition( .addTransition(
JobState.FAILED, JobStateInternal.FAILED,
JobState.ERROR, JobEventType.INTERNAL_ERROR, JobStateInternal.ERROR, JobEventType.INTERNAL_ERROR,
INTERNAL_ERROR_TRANSITION) INTERNAL_ERROR_TRANSITION)
// Ignore-able events // Ignore-able events
.addTransition(JobState.FAILED, JobState.FAILED, .addTransition(JobStateInternal.FAILED, JobStateInternal.FAILED,
EnumSet.of(JobEventType.JOB_KILL, EnumSet.of(JobEventType.JOB_KILL,
JobEventType.JOB_UPDATED_NODES, JobEventType.JOB_UPDATED_NODES,
JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE)) JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE))
// Transitions from KILLED state // Transitions from KILLED state
.addTransition(JobState.KILLED, JobState.KILLED, .addTransition(JobStateInternal.KILLED, JobStateInternal.KILLED,
JobEventType.JOB_DIAGNOSTIC_UPDATE, JobEventType.JOB_DIAGNOSTIC_UPDATE,
DIAGNOSTIC_UPDATE_TRANSITION) DIAGNOSTIC_UPDATE_TRANSITION)
.addTransition(JobState.KILLED, JobState.KILLED, .addTransition(JobStateInternal.KILLED, JobStateInternal.KILLED,
JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION) JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
.addTransition( .addTransition(
JobState.KILLED, JobStateInternal.KILLED,
JobState.ERROR, JobEventType.INTERNAL_ERROR, JobStateInternal.ERROR, JobEventType.INTERNAL_ERROR,
INTERNAL_ERROR_TRANSITION) INTERNAL_ERROR_TRANSITION)
// Ignore-able events // Ignore-able events
.addTransition(JobState.KILLED, JobState.KILLED, .addTransition(JobStateInternal.KILLED, JobStateInternal.KILLED,
EnumSet.of(JobEventType.JOB_KILL, EnumSet.of(JobEventType.JOB_KILL,
JobEventType.JOB_UPDATED_NODES, JobEventType.JOB_UPDATED_NODES,
JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE)) JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE))
// No transitions from INTERNAL_ERROR state. Ignore all. // No transitions from INTERNAL_ERROR state. Ignore all.
.addTransition( .addTransition(
JobState.ERROR, JobStateInternal.ERROR,
JobState.ERROR, JobStateInternal.ERROR,
EnumSet.of(JobEventType.JOB_INIT, EnumSet.of(JobEventType.JOB_INIT,
JobEventType.JOB_KILL, JobEventType.JOB_KILL,
JobEventType.JOB_TASK_COMPLETED, JobEventType.JOB_TASK_COMPLETED,
@ -376,12 +377,12 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
JobEventType.JOB_UPDATED_NODES, JobEventType.JOB_UPDATED_NODES,
JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE, JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE,
JobEventType.INTERNAL_ERROR)) JobEventType.INTERNAL_ERROR))
.addTransition(JobState.ERROR, JobState.ERROR, .addTransition(JobStateInternal.ERROR, JobStateInternal.ERROR,
JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION) JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
// create the topology tables // create the topology tables
.installTopology(); .installTopology();
private final StateMachine<JobState, JobEventType, JobEvent> stateMachine; private final StateMachine<JobStateInternal, JobEventType, JobEvent> stateMachine;
//changing fields while the job is running //changing fields while the job is running
private int numMapTasks; private int numMapTasks;
@ -446,7 +447,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
stateMachine = stateMachineFactory.make(this); stateMachine = stateMachineFactory.make(this);
} }
protected StateMachine<JobState, JobEventType, JobEvent> getStateMachine() { protected StateMachine<JobStateInternal, JobEventType, JobEvent> getStateMachine() {
return stateMachine; return stateMachine;
} }
@ -520,9 +521,9 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
readLock.lock(); readLock.lock();
try { try {
JobState state = getState(); JobStateInternal state = getInternalState();
if (state == JobState.ERROR || state == JobState.FAILED if (state == JobStateInternal.ERROR || state == JobStateInternal.FAILED
|| state == JobState.KILLED || state == JobState.SUCCEEDED) { || state == JobStateInternal.KILLED || state == JobStateInternal.SUCCEEDED) {
this.mayBeConstructFinalFullCounters(); this.mayBeConstructFinalFullCounters();
return fullCounters; return fullCounters;
} }
@ -587,7 +588,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
diagsb.append(s).append("\n"); diagsb.append(s).append("\n");
} }
if (getState() == JobState.NEW) { if (getInternalState() == JobStateInternal.NEW) {
return MRBuilderUtils.newJobReport(jobId, jobName, username, state, return MRBuilderUtils.newJobReport(jobId, jobName, username, state,
appSubmitTime, startTime, finishTime, setupProgress, 0.0f, 0.0f, appSubmitTime, startTime, finishTime, setupProgress, 0.0f, 0.0f,
cleanupProgress, jobFile, amInfos, isUber, diagsb.toString()); cleanupProgress, jobFile, amInfos, isUber, diagsb.toString());
@ -674,7 +675,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
public JobState getState() { public JobState getState() {
readLock.lock(); readLock.lock();
try { try {
return getStateMachine().getCurrentState(); return getExternalState(getStateMachine().getCurrentState());
} finally { } finally {
readLock.unlock(); readLock.unlock();
} }
@ -695,7 +696,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
LOG.debug("Processing " + event.getJobId() + " of type " + event.getType()); LOG.debug("Processing " + event.getJobId() + " of type " + event.getType());
try { try {
writeLock.lock(); writeLock.lock();
JobState oldState = getState(); JobStateInternal oldState = getInternalState();
try { try {
getStateMachine().doTransition(event.getType(), event); getStateMachine().doTransition(event.getType(), event);
} catch (InvalidStateTransitonException e) { } catch (InvalidStateTransitonException e) {
@ -706,9 +707,9 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
JobEventType.INTERNAL_ERROR)); JobEventType.INTERNAL_ERROR));
} }
//notify the eventhandler of state change //notify the eventhandler of state change
if (oldState != getState()) { if (oldState != getInternalState()) {
LOG.info(jobId + "Job Transitioned from " + oldState + " to " LOG.info(jobId + "Job Transitioned from " + oldState + " to "
+ getState()); + getInternalState());
} }
} }
@ -717,6 +718,25 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
} }
} }
@Private
public JobStateInternal getInternalState() {
readLock.lock();
try {
return getStateMachine().getCurrentState();
} finally {
readLock.unlock();
}
}
private static JobState getExternalState(JobStateInternal smState) {
if (smState == JobStateInternal.KILL_WAIT) {
return JobState.KILLED;
} else {
return JobState.valueOf(smState.name());
}
}
//helpful in testing //helpful in testing
protected void addTask(Task task) { protected void addTask(Task task) {
synchronized (tasksSyncHandle) { synchronized (tasksSyncHandle) {
@ -757,7 +777,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
return FileSystem.get(conf); return FileSystem.get(conf);
} }
static JobState checkJobCompleteSuccess(JobImpl job) { static JobStateInternal checkJobCompleteSuccess(JobImpl job) {
// check for Job success // check for Job success
if (job.completedTaskCount == job.tasks.size()) { if (job.completedTaskCount == job.tasks.size()) {
try { try {
@ -767,16 +787,16 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
LOG.error("Could not do commit for Job", e); LOG.error("Could not do commit for Job", e);
job.addDiagnostic("Job commit failed: " + e.getMessage()); job.addDiagnostic("Job commit failed: " + e.getMessage());
job.abortJob(org.apache.hadoop.mapreduce.JobStatus.State.FAILED); job.abortJob(org.apache.hadoop.mapreduce.JobStatus.State.FAILED);
return job.finished(JobState.FAILED); return job.finished(JobStateInternal.FAILED);
} }
job.logJobHistoryFinishedEvent(); job.logJobHistoryFinishedEvent();
return job.finished(JobState.SUCCEEDED); return job.finished(JobStateInternal.SUCCEEDED);
} }
return null; return null;
} }
JobState finished(JobState finalState) { JobStateInternal finished(JobStateInternal finalState) {
if (getState() == JobState.RUNNING) { if (getInternalState() == JobStateInternal.RUNNING) {
metrics.endRunningJob(this); metrics.endRunningJob(this);
} }
if (finishTime == 0) setFinishTime(); if (finishTime == 0) setFinishTime();
@ -989,7 +1009,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
*/ */
public static class InitTransition public static class InitTransition
implements MultipleArcTransition<JobImpl, JobEvent, JobState> { implements MultipleArcTransition<JobImpl, JobEvent, JobStateInternal> {
/** /**
* Note that this transition method is called directly (and synchronously) * Note that this transition method is called directly (and synchronously)
@ -999,7 +1019,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
* way; MR version is). * way; MR version is).
*/ */
@Override @Override
public JobState transition(JobImpl job, JobEvent event) { public JobStateInternal transition(JobImpl job, JobEvent event) {
job.metrics.submittedJob(job); job.metrics.submittedJob(job);
job.metrics.preparingJob(job); job.metrics.preparingJob(job);
try { try {
@ -1065,7 +1085,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
createReduceTasks(job); createReduceTasks(job);
job.metrics.endPreparingJob(job); job.metrics.endPreparingJob(job);
return JobState.INITED; return JobStateInternal.INITED;
//TODO XXX Should JobInitedEvent be generated here (instead of in StartTransition) //TODO XXX Should JobInitedEvent be generated here (instead of in StartTransition)
} catch (IOException e) { } catch (IOException e) {
@ -1074,7 +1094,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
+ StringUtils.stringifyException(e)); + StringUtils.stringifyException(e));
job.abortJob(org.apache.hadoop.mapreduce.JobStatus.State.FAILED); job.abortJob(org.apache.hadoop.mapreduce.JobStatus.State.FAILED);
job.metrics.endPreparingJob(job); job.metrics.endPreparingJob(job);
return job.finished(JobState.FAILED); return job.finished(JobStateInternal.FAILED);
} }
} }
@ -1282,9 +1302,9 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
JobUnsuccessfulCompletionEvent failedEvent = JobUnsuccessfulCompletionEvent failedEvent =
new JobUnsuccessfulCompletionEvent(job.oldJobId, new JobUnsuccessfulCompletionEvent(job.oldJobId,
job.finishTime, 0, 0, job.finishTime, 0, 0,
JobState.KILLED.toString()); JobStateInternal.KILLED.toString());
job.eventHandler.handle(new JobHistoryEvent(job.jobId, failedEvent)); job.eventHandler.handle(new JobHistoryEvent(job.jobId, failedEvent));
job.finished(JobState.KILLED); job.finished(JobStateInternal.KILLED);
} }
} }
@ -1294,7 +1314,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
public void transition(JobImpl job, JobEvent event) { public void transition(JobImpl job, JobEvent event) {
job.abortJob(org.apache.hadoop.mapreduce.JobStatus.State.KILLED); job.abortJob(org.apache.hadoop.mapreduce.JobStatus.State.KILLED);
job.addDiagnostic("Job received Kill in INITED state."); job.addDiagnostic("Job received Kill in INITED state.");
job.finished(JobState.KILLED); job.finished(JobStateInternal.KILLED);
} }
} }
@ -1394,10 +1414,10 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
} }
private static class TaskCompletedTransition implements private static class TaskCompletedTransition implements
MultipleArcTransition<JobImpl, JobEvent, JobState> { MultipleArcTransition<JobImpl, JobEvent, JobStateInternal> {
@Override @Override
public JobState transition(JobImpl job, JobEvent event) { public JobStateInternal transition(JobImpl job, JobEvent event) {
job.completedTaskCount++; job.completedTaskCount++;
LOG.info("Num completed Tasks: " + job.completedTaskCount); LOG.info("Num completed Tasks: " + job.completedTaskCount);
JobTaskEvent taskEvent = (JobTaskEvent) event; JobTaskEvent taskEvent = (JobTaskEvent) event;
@ -1413,7 +1433,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
return checkJobForCompletion(job); return checkJobForCompletion(job);
} }
protected JobState checkJobForCompletion(JobImpl job) { protected JobStateInternal checkJobForCompletion(JobImpl job) {
//check for Job failure //check for Job failure
if (job.failedMapTaskCount*100 > if (job.failedMapTaskCount*100 >
job.allowedMapFailuresPercent*job.numMapTasks || job.allowedMapFailuresPercent*job.numMapTasks ||
@ -1427,16 +1447,16 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
LOG.info(diagnosticMsg); LOG.info(diagnosticMsg);
job.addDiagnostic(diagnosticMsg); job.addDiagnostic(diagnosticMsg);
job.abortJob(org.apache.hadoop.mapreduce.JobStatus.State.FAILED); job.abortJob(org.apache.hadoop.mapreduce.JobStatus.State.FAILED);
return job.finished(JobState.FAILED); return job.finished(JobStateInternal.FAILED);
} }
JobState jobCompleteSuccess = JobImpl.checkJobCompleteSuccess(job); JobStateInternal jobCompleteSuccess = JobImpl.checkJobCompleteSuccess(job);
if (jobCompleteSuccess != null) { if (jobCompleteSuccess != null) {
return jobCompleteSuccess; return jobCompleteSuccess;
} }
//return the current state, Job not finished yet //return the current state, Job not finished yet
return job.getState(); return job.getInternalState();
} }
private void taskSucceeded(JobImpl job, Task task) { private void taskSucceeded(JobImpl job, Task task) {
@ -1470,17 +1490,17 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
// Transition class for handling jobs with no tasks // Transition class for handling jobs with no tasks
static class JobNoTasksCompletedTransition implements static class JobNoTasksCompletedTransition implements
MultipleArcTransition<JobImpl, JobEvent, JobState> { MultipleArcTransition<JobImpl, JobEvent, JobStateInternal> {
@Override @Override
public JobState transition(JobImpl job, JobEvent event) { public JobStateInternal transition(JobImpl job, JobEvent event) {
JobState jobCompleteSuccess = JobImpl.checkJobCompleteSuccess(job); JobStateInternal jobCompleteSuccess = JobImpl.checkJobCompleteSuccess(job);
if (jobCompleteSuccess != null) { if (jobCompleteSuccess != null) {
return jobCompleteSuccess; return jobCompleteSuccess;
} }
// Return the current state, Job not finished yet // Return the current state, Job not finished yet
return job.getState(); return job.getInternalState();
} }
} }
@ -1497,14 +1517,14 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
private static class KillWaitTaskCompletedTransition extends private static class KillWaitTaskCompletedTransition extends
TaskCompletedTransition { TaskCompletedTransition {
@Override @Override
protected JobState checkJobForCompletion(JobImpl job) { protected JobStateInternal checkJobForCompletion(JobImpl job) {
if (job.completedTaskCount == job.tasks.size()) { if (job.completedTaskCount == job.tasks.size()) {
job.setFinishTime(); job.setFinishTime();
job.abortJob(org.apache.hadoop.mapreduce.JobStatus.State.KILLED); job.abortJob(org.apache.hadoop.mapreduce.JobStatus.State.KILLED);
return job.finished(JobState.KILLED); return job.finished(JobStateInternal.KILLED);
} }
//return the current state, Job not finished yet //return the current state, Job not finished yet
return job.getState(); return job.getInternalState();
} }
} }
@ -1558,9 +1578,9 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
JobUnsuccessfulCompletionEvent failedEvent = JobUnsuccessfulCompletionEvent failedEvent =
new JobUnsuccessfulCompletionEvent(job.oldJobId, new JobUnsuccessfulCompletionEvent(job.oldJobId,
job.finishTime, 0, 0, job.finishTime, 0, 0,
JobState.ERROR.toString()); JobStateInternal.ERROR.toString());
job.eventHandler.handle(new JobHistoryEvent(job.jobId, failedEvent)); job.eventHandler.handle(new JobHistoryEvent(job.jobId, failedEvent));
job.finished(JobState.ERROR); job.finished(JobStateInternal.ERROR);
} }
} }

View File

@ -39,6 +39,7 @@ import java.util.regex.Pattern;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
@ -72,10 +73,10 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptReport; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptReport;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId; import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType; import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.AppContext; import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener; import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttemptStateInternal;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobCounterUpdateEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.JobCounterUpdateEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobDiagnosticsUpdateEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.JobDiagnosticsUpdateEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
@ -88,7 +89,6 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptKillEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptKillEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent;
@ -132,6 +132,7 @@ import org.apache.hadoop.yarn.util.BuilderUtils;
import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.RackResolver; import org.apache.hadoop.yarn.util.RackResolver;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
/** /**
@ -184,149 +185,149 @@ public abstract class TaskAttemptImpl implements
= new DiagnosticInformationUpdater(); = new DiagnosticInformationUpdater();
private static final StateMachineFactory private static final StateMachineFactory
<TaskAttemptImpl, TaskAttemptState, TaskAttemptEventType, TaskAttemptEvent> <TaskAttemptImpl, TaskAttemptStateInternal, TaskAttemptEventType, TaskAttemptEvent>
stateMachineFactory stateMachineFactory
= new StateMachineFactory = new StateMachineFactory
<TaskAttemptImpl, TaskAttemptState, TaskAttemptEventType, TaskAttemptEvent> <TaskAttemptImpl, TaskAttemptStateInternal, TaskAttemptEventType, TaskAttemptEvent>
(TaskAttemptState.NEW) (TaskAttemptStateInternal.NEW)
// Transitions from the NEW state. // Transitions from the NEW state.
.addTransition(TaskAttemptState.NEW, TaskAttemptState.UNASSIGNED, .addTransition(TaskAttemptStateInternal.NEW, TaskAttemptStateInternal.UNASSIGNED,
TaskAttemptEventType.TA_SCHEDULE, new RequestContainerTransition(false)) TaskAttemptEventType.TA_SCHEDULE, new RequestContainerTransition(false))
.addTransition(TaskAttemptState.NEW, TaskAttemptState.UNASSIGNED, .addTransition(TaskAttemptStateInternal.NEW, TaskAttemptStateInternal.UNASSIGNED,
TaskAttemptEventType.TA_RESCHEDULE, new RequestContainerTransition(true)) TaskAttemptEventType.TA_RESCHEDULE, new RequestContainerTransition(true))
.addTransition(TaskAttemptState.NEW, TaskAttemptState.KILLED, .addTransition(TaskAttemptStateInternal.NEW, TaskAttemptStateInternal.KILLED,
TaskAttemptEventType.TA_KILL, new KilledTransition()) TaskAttemptEventType.TA_KILL, new KilledTransition())
.addTransition(TaskAttemptState.NEW, TaskAttemptState.FAILED, .addTransition(TaskAttemptStateInternal.NEW, TaskAttemptStateInternal.FAILED,
TaskAttemptEventType.TA_FAILMSG, new FailedTransition()) TaskAttemptEventType.TA_FAILMSG, new FailedTransition())
// Transitions from the UNASSIGNED state. // Transitions from the UNASSIGNED state.
.addTransition(TaskAttemptState.UNASSIGNED, .addTransition(TaskAttemptStateInternal.UNASSIGNED,
TaskAttemptState.ASSIGNED, TaskAttemptEventType.TA_ASSIGNED, TaskAttemptStateInternal.ASSIGNED, TaskAttemptEventType.TA_ASSIGNED,
new ContainerAssignedTransition()) new ContainerAssignedTransition())
.addTransition(TaskAttemptState.UNASSIGNED, TaskAttemptState.KILLED, .addTransition(TaskAttemptStateInternal.UNASSIGNED, TaskAttemptStateInternal.KILLED,
TaskAttemptEventType.TA_KILL, new DeallocateContainerTransition( TaskAttemptEventType.TA_KILL, new DeallocateContainerTransition(
TaskAttemptState.KILLED, true)) TaskAttemptStateInternal.KILLED, true))
.addTransition(TaskAttemptState.UNASSIGNED, TaskAttemptState.FAILED, .addTransition(TaskAttemptStateInternal.UNASSIGNED, TaskAttemptStateInternal.FAILED,
TaskAttemptEventType.TA_FAILMSG, new DeallocateContainerTransition( TaskAttemptEventType.TA_FAILMSG, new DeallocateContainerTransition(
TaskAttemptState.FAILED, true)) TaskAttemptStateInternal.FAILED, true))
// Transitions from the ASSIGNED state. // Transitions from the ASSIGNED state.
.addTransition(TaskAttemptState.ASSIGNED, TaskAttemptState.RUNNING, .addTransition(TaskAttemptStateInternal.ASSIGNED, TaskAttemptStateInternal.RUNNING,
TaskAttemptEventType.TA_CONTAINER_LAUNCHED, TaskAttemptEventType.TA_CONTAINER_LAUNCHED,
new LaunchedContainerTransition()) new LaunchedContainerTransition())
.addTransition(TaskAttemptState.ASSIGNED, TaskAttemptState.ASSIGNED, .addTransition(TaskAttemptStateInternal.ASSIGNED, TaskAttemptStateInternal.ASSIGNED,
TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION) DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
.addTransition(TaskAttemptState.ASSIGNED, TaskAttemptState.FAILED, .addTransition(TaskAttemptStateInternal.ASSIGNED, TaskAttemptStateInternal.FAILED,
TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED, TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED,
new DeallocateContainerTransition(TaskAttemptState.FAILED, false)) new DeallocateContainerTransition(TaskAttemptStateInternal.FAILED, false))
.addTransition(TaskAttemptState.ASSIGNED, .addTransition(TaskAttemptStateInternal.ASSIGNED,
TaskAttemptState.FAIL_CONTAINER_CLEANUP, TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP,
TaskAttemptEventType.TA_CONTAINER_COMPLETED, TaskAttemptEventType.TA_CONTAINER_COMPLETED,
CLEANUP_CONTAINER_TRANSITION) CLEANUP_CONTAINER_TRANSITION)
// ^ If RM kills the container due to expiry, preemption etc. // ^ If RM kills the container due to expiry, preemption etc.
.addTransition(TaskAttemptState.ASSIGNED, .addTransition(TaskAttemptStateInternal.ASSIGNED,
TaskAttemptState.KILL_CONTAINER_CLEANUP, TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP,
TaskAttemptEventType.TA_KILL, CLEANUP_CONTAINER_TRANSITION) TaskAttemptEventType.TA_KILL, CLEANUP_CONTAINER_TRANSITION)
.addTransition(TaskAttemptState.ASSIGNED, .addTransition(TaskAttemptStateInternal.ASSIGNED,
TaskAttemptState.FAIL_CONTAINER_CLEANUP, TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP,
TaskAttemptEventType.TA_FAILMSG, CLEANUP_CONTAINER_TRANSITION) TaskAttemptEventType.TA_FAILMSG, CLEANUP_CONTAINER_TRANSITION)
// Transitions from RUNNING state. // Transitions from RUNNING state.
.addTransition(TaskAttemptState.RUNNING, TaskAttemptState.RUNNING, .addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.RUNNING,
TaskAttemptEventType.TA_UPDATE, new StatusUpdater()) TaskAttemptEventType.TA_UPDATE, new StatusUpdater())
.addTransition(TaskAttemptState.RUNNING, TaskAttemptState.RUNNING, .addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.RUNNING,
TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION) DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
// If no commit is required, task directly goes to success // If no commit is required, task directly goes to success
.addTransition(TaskAttemptState.RUNNING, .addTransition(TaskAttemptStateInternal.RUNNING,
TaskAttemptState.SUCCESS_CONTAINER_CLEANUP, TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP,
TaskAttemptEventType.TA_DONE, CLEANUP_CONTAINER_TRANSITION) TaskAttemptEventType.TA_DONE, CLEANUP_CONTAINER_TRANSITION)
// If commit is required, task goes through commit pending state. // If commit is required, task goes through commit pending state.
.addTransition(TaskAttemptState.RUNNING, .addTransition(TaskAttemptStateInternal.RUNNING,
TaskAttemptState.COMMIT_PENDING, TaskAttemptStateInternal.COMMIT_PENDING,
TaskAttemptEventType.TA_COMMIT_PENDING, new CommitPendingTransition()) TaskAttemptEventType.TA_COMMIT_PENDING, new CommitPendingTransition())
// Failure handling while RUNNING // Failure handling while RUNNING
.addTransition(TaskAttemptState.RUNNING, .addTransition(TaskAttemptStateInternal.RUNNING,
TaskAttemptState.FAIL_CONTAINER_CLEANUP, TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP,
TaskAttemptEventType.TA_FAILMSG, CLEANUP_CONTAINER_TRANSITION) TaskAttemptEventType.TA_FAILMSG, CLEANUP_CONTAINER_TRANSITION)
//for handling container exit without sending the done or fail msg //for handling container exit without sending the done or fail msg
.addTransition(TaskAttemptState.RUNNING, .addTransition(TaskAttemptStateInternal.RUNNING,
TaskAttemptState.FAIL_CONTAINER_CLEANUP, TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP,
TaskAttemptEventType.TA_CONTAINER_COMPLETED, TaskAttemptEventType.TA_CONTAINER_COMPLETED,
CLEANUP_CONTAINER_TRANSITION) CLEANUP_CONTAINER_TRANSITION)
// Timeout handling while RUNNING // Timeout handling while RUNNING
.addTransition(TaskAttemptState.RUNNING, .addTransition(TaskAttemptStateInternal.RUNNING,
TaskAttemptState.FAIL_CONTAINER_CLEANUP, TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP,
TaskAttemptEventType.TA_TIMED_OUT, CLEANUP_CONTAINER_TRANSITION) TaskAttemptEventType.TA_TIMED_OUT, CLEANUP_CONTAINER_TRANSITION)
// if container killed by AM shutting down // if container killed by AM shutting down
.addTransition(TaskAttemptState.RUNNING, .addTransition(TaskAttemptStateInternal.RUNNING,
TaskAttemptState.KILLED, TaskAttemptStateInternal.KILLED,
TaskAttemptEventType.TA_CONTAINER_CLEANED, new KilledTransition()) TaskAttemptEventType.TA_CONTAINER_CLEANED, new KilledTransition())
// Kill handling // Kill handling
.addTransition(TaskAttemptState.RUNNING, .addTransition(TaskAttemptStateInternal.RUNNING,
TaskAttemptState.KILL_CONTAINER_CLEANUP, TaskAttemptEventType.TA_KILL, TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP, TaskAttemptEventType.TA_KILL,
CLEANUP_CONTAINER_TRANSITION) CLEANUP_CONTAINER_TRANSITION)
// Transitions from COMMIT_PENDING state // Transitions from COMMIT_PENDING state
.addTransition(TaskAttemptState.COMMIT_PENDING, .addTransition(TaskAttemptStateInternal.COMMIT_PENDING,
TaskAttemptState.COMMIT_PENDING, TaskAttemptEventType.TA_UPDATE, TaskAttemptStateInternal.COMMIT_PENDING, TaskAttemptEventType.TA_UPDATE,
new StatusUpdater()) new StatusUpdater())
.addTransition(TaskAttemptState.COMMIT_PENDING, .addTransition(TaskAttemptStateInternal.COMMIT_PENDING,
TaskAttemptState.COMMIT_PENDING, TaskAttemptStateInternal.COMMIT_PENDING,
TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION) DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
.addTransition(TaskAttemptState.COMMIT_PENDING, .addTransition(TaskAttemptStateInternal.COMMIT_PENDING,
TaskAttemptState.SUCCESS_CONTAINER_CLEANUP, TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP,
TaskAttemptEventType.TA_DONE, CLEANUP_CONTAINER_TRANSITION) TaskAttemptEventType.TA_DONE, CLEANUP_CONTAINER_TRANSITION)
.addTransition(TaskAttemptState.COMMIT_PENDING, .addTransition(TaskAttemptStateInternal.COMMIT_PENDING,
TaskAttemptState.KILL_CONTAINER_CLEANUP, TaskAttemptEventType.TA_KILL, TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP, TaskAttemptEventType.TA_KILL,
CLEANUP_CONTAINER_TRANSITION) CLEANUP_CONTAINER_TRANSITION)
// if container killed by AM shutting down // if container killed by AM shutting down
.addTransition(TaskAttemptState.COMMIT_PENDING, .addTransition(TaskAttemptStateInternal.COMMIT_PENDING,
TaskAttemptState.KILLED, TaskAttemptStateInternal.KILLED,
TaskAttemptEventType.TA_CONTAINER_CLEANED, new KilledTransition()) TaskAttemptEventType.TA_CONTAINER_CLEANED, new KilledTransition())
.addTransition(TaskAttemptState.COMMIT_PENDING, .addTransition(TaskAttemptStateInternal.COMMIT_PENDING,
TaskAttemptState.FAIL_CONTAINER_CLEANUP, TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP,
TaskAttemptEventType.TA_FAILMSG, CLEANUP_CONTAINER_TRANSITION) TaskAttemptEventType.TA_FAILMSG, CLEANUP_CONTAINER_TRANSITION)
.addTransition(TaskAttemptState.COMMIT_PENDING, .addTransition(TaskAttemptStateInternal.COMMIT_PENDING,
TaskAttemptState.FAIL_CONTAINER_CLEANUP, TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP,
TaskAttemptEventType.TA_CONTAINER_COMPLETED, TaskAttemptEventType.TA_CONTAINER_COMPLETED,
CLEANUP_CONTAINER_TRANSITION) CLEANUP_CONTAINER_TRANSITION)
.addTransition(TaskAttemptState.COMMIT_PENDING, .addTransition(TaskAttemptStateInternal.COMMIT_PENDING,
TaskAttemptState.FAIL_CONTAINER_CLEANUP, TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP,
TaskAttemptEventType.TA_TIMED_OUT, CLEANUP_CONTAINER_TRANSITION) TaskAttemptEventType.TA_TIMED_OUT, CLEANUP_CONTAINER_TRANSITION)
// Transitions from SUCCESS_CONTAINER_CLEANUP state // Transitions from SUCCESS_CONTAINER_CLEANUP state
// kill and cleanup the container // kill and cleanup the container
.addTransition(TaskAttemptState.SUCCESS_CONTAINER_CLEANUP, .addTransition(TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP,
TaskAttemptState.SUCCEEDED, TaskAttemptEventType.TA_CONTAINER_CLEANED, TaskAttemptStateInternal.SUCCEEDED, TaskAttemptEventType.TA_CONTAINER_CLEANED,
new SucceededTransition()) new SucceededTransition())
.addTransition( .addTransition(
TaskAttemptState.SUCCESS_CONTAINER_CLEANUP, TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP,
TaskAttemptState.SUCCESS_CONTAINER_CLEANUP, TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP,
TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION) DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
// Ignore-able events // Ignore-able events
.addTransition(TaskAttemptState.SUCCESS_CONTAINER_CLEANUP, .addTransition(TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP,
TaskAttemptState.SUCCESS_CONTAINER_CLEANUP, TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP,
EnumSet.of(TaskAttemptEventType.TA_KILL, EnumSet.of(TaskAttemptEventType.TA_KILL,
TaskAttemptEventType.TA_FAILMSG, TaskAttemptEventType.TA_FAILMSG,
TaskAttemptEventType.TA_TIMED_OUT, TaskAttemptEventType.TA_TIMED_OUT,
TaskAttemptEventType.TA_CONTAINER_COMPLETED)) TaskAttemptEventType.TA_CONTAINER_COMPLETED))
// Transitions from FAIL_CONTAINER_CLEANUP state. // Transitions from FAIL_CONTAINER_CLEANUP state.
.addTransition(TaskAttemptState.FAIL_CONTAINER_CLEANUP, .addTransition(TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP,
TaskAttemptState.FAIL_TASK_CLEANUP, TaskAttemptStateInternal.FAIL_TASK_CLEANUP,
TaskAttemptEventType.TA_CONTAINER_CLEANED, new TaskCleanupTransition()) TaskAttemptEventType.TA_CONTAINER_CLEANED, new TaskCleanupTransition())
.addTransition(TaskAttemptState.FAIL_CONTAINER_CLEANUP, .addTransition(TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP,
TaskAttemptState.FAIL_CONTAINER_CLEANUP, TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP,
TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION) DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
// Ignore-able events // Ignore-able events
.addTransition(TaskAttemptState.FAIL_CONTAINER_CLEANUP, .addTransition(TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP,
TaskAttemptState.FAIL_CONTAINER_CLEANUP, TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP,
EnumSet.of(TaskAttemptEventType.TA_KILL, EnumSet.of(TaskAttemptEventType.TA_KILL,
TaskAttemptEventType.TA_CONTAINER_COMPLETED, TaskAttemptEventType.TA_CONTAINER_COMPLETED,
TaskAttemptEventType.TA_UPDATE, TaskAttemptEventType.TA_UPDATE,
@ -339,17 +340,17 @@ public abstract class TaskAttemptImpl implements
TaskAttemptEventType.TA_TIMED_OUT)) TaskAttemptEventType.TA_TIMED_OUT))
// Transitions from KILL_CONTAINER_CLEANUP // Transitions from KILL_CONTAINER_CLEANUP
.addTransition(TaskAttemptState.KILL_CONTAINER_CLEANUP, .addTransition(TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP,
TaskAttemptState.KILL_TASK_CLEANUP, TaskAttemptStateInternal.KILL_TASK_CLEANUP,
TaskAttemptEventType.TA_CONTAINER_CLEANED, new TaskCleanupTransition()) TaskAttemptEventType.TA_CONTAINER_CLEANED, new TaskCleanupTransition())
.addTransition(TaskAttemptState.KILL_CONTAINER_CLEANUP, .addTransition(TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP,
TaskAttemptState.KILL_CONTAINER_CLEANUP, TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP,
TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION) DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
// Ignore-able events // Ignore-able events
.addTransition( .addTransition(
TaskAttemptState.KILL_CONTAINER_CLEANUP, TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP,
TaskAttemptState.KILL_CONTAINER_CLEANUP, TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP,
EnumSet.of(TaskAttemptEventType.TA_KILL, EnumSet.of(TaskAttemptEventType.TA_KILL,
TaskAttemptEventType.TA_CONTAINER_COMPLETED, TaskAttemptEventType.TA_CONTAINER_COMPLETED,
TaskAttemptEventType.TA_UPDATE, TaskAttemptEventType.TA_UPDATE,
@ -362,16 +363,16 @@ public abstract class TaskAttemptImpl implements
// Transitions from FAIL_TASK_CLEANUP // Transitions from FAIL_TASK_CLEANUP
// run the task cleanup // run the task cleanup
.addTransition(TaskAttemptState.FAIL_TASK_CLEANUP, .addTransition(TaskAttemptStateInternal.FAIL_TASK_CLEANUP,
TaskAttemptState.FAILED, TaskAttemptEventType.TA_CLEANUP_DONE, TaskAttemptStateInternal.FAILED, TaskAttemptEventType.TA_CLEANUP_DONE,
new FailedTransition()) new FailedTransition())
.addTransition(TaskAttemptState.FAIL_TASK_CLEANUP, .addTransition(TaskAttemptStateInternal.FAIL_TASK_CLEANUP,
TaskAttemptState.FAIL_TASK_CLEANUP, TaskAttemptStateInternal.FAIL_TASK_CLEANUP,
TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION) DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
// Ignore-able events // Ignore-able events
.addTransition(TaskAttemptState.FAIL_TASK_CLEANUP, .addTransition(TaskAttemptStateInternal.FAIL_TASK_CLEANUP,
TaskAttemptState.FAIL_TASK_CLEANUP, TaskAttemptStateInternal.FAIL_TASK_CLEANUP,
EnumSet.of(TaskAttemptEventType.TA_KILL, EnumSet.of(TaskAttemptEventType.TA_KILL,
TaskAttemptEventType.TA_CONTAINER_COMPLETED, TaskAttemptEventType.TA_CONTAINER_COMPLETED,
TaskAttemptEventType.TA_UPDATE, TaskAttemptEventType.TA_UPDATE,
@ -384,16 +385,16 @@ public abstract class TaskAttemptImpl implements
TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED)) TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED))
// Transitions from KILL_TASK_CLEANUP // Transitions from KILL_TASK_CLEANUP
.addTransition(TaskAttemptState.KILL_TASK_CLEANUP, .addTransition(TaskAttemptStateInternal.KILL_TASK_CLEANUP,
TaskAttemptState.KILLED, TaskAttemptEventType.TA_CLEANUP_DONE, TaskAttemptStateInternal.KILLED, TaskAttemptEventType.TA_CLEANUP_DONE,
new KilledTransition()) new KilledTransition())
.addTransition(TaskAttemptState.KILL_TASK_CLEANUP, .addTransition(TaskAttemptStateInternal.KILL_TASK_CLEANUP,
TaskAttemptState.KILL_TASK_CLEANUP, TaskAttemptStateInternal.KILL_TASK_CLEANUP,
TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION) DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
// Ignore-able events // Ignore-able events
.addTransition(TaskAttemptState.KILL_TASK_CLEANUP, .addTransition(TaskAttemptStateInternal.KILL_TASK_CLEANUP,
TaskAttemptState.KILL_TASK_CLEANUP, TaskAttemptStateInternal.KILL_TASK_CLEANUP,
EnumSet.of(TaskAttemptEventType.TA_KILL, EnumSet.of(TaskAttemptEventType.TA_KILL,
TaskAttemptEventType.TA_CONTAINER_COMPLETED, TaskAttemptEventType.TA_CONTAINER_COMPLETED,
TaskAttemptEventType.TA_UPDATE, TaskAttemptEventType.TA_UPDATE,
@ -406,31 +407,31 @@ public abstract class TaskAttemptImpl implements
TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED)) TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED))
// Transitions from SUCCEEDED // Transitions from SUCCEEDED
.addTransition(TaskAttemptState.SUCCEEDED, //only possible for map attempts .addTransition(TaskAttemptStateInternal.SUCCEEDED, //only possible for map attempts
TaskAttemptState.FAILED, TaskAttemptStateInternal.FAILED,
TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURE, TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURE,
new TooManyFetchFailureTransition()) new TooManyFetchFailureTransition())
.addTransition(TaskAttemptState.SUCCEEDED, .addTransition(TaskAttemptStateInternal.SUCCEEDED,
EnumSet.of(TaskAttemptState.SUCCEEDED, TaskAttemptState.KILLED), EnumSet.of(TaskAttemptStateInternal.SUCCEEDED, TaskAttemptStateInternal.KILLED),
TaskAttemptEventType.TA_KILL, TaskAttemptEventType.TA_KILL,
new KilledAfterSuccessTransition()) new KilledAfterSuccessTransition())
.addTransition( .addTransition(
TaskAttemptState.SUCCEEDED, TaskAttemptState.SUCCEEDED, TaskAttemptStateInternal.SUCCEEDED, TaskAttemptStateInternal.SUCCEEDED,
TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION) DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
// Ignore-able events for SUCCEEDED state // Ignore-able events for SUCCEEDED state
.addTransition(TaskAttemptState.SUCCEEDED, .addTransition(TaskAttemptStateInternal.SUCCEEDED,
TaskAttemptState.SUCCEEDED, TaskAttemptStateInternal.SUCCEEDED,
EnumSet.of(TaskAttemptEventType.TA_FAILMSG, EnumSet.of(TaskAttemptEventType.TA_FAILMSG,
TaskAttemptEventType.TA_CONTAINER_CLEANED, TaskAttemptEventType.TA_CONTAINER_CLEANED,
TaskAttemptEventType.TA_CONTAINER_COMPLETED)) TaskAttemptEventType.TA_CONTAINER_COMPLETED))
// Transitions from FAILED state // Transitions from FAILED state
.addTransition(TaskAttemptState.FAILED, TaskAttemptState.FAILED, .addTransition(TaskAttemptStateInternal.FAILED, TaskAttemptStateInternal.FAILED,
TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION) DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
// Ignore-able events for FAILED state // Ignore-able events for FAILED state
.addTransition(TaskAttemptState.FAILED, TaskAttemptState.FAILED, .addTransition(TaskAttemptStateInternal.FAILED, TaskAttemptStateInternal.FAILED,
EnumSet.of(TaskAttemptEventType.TA_KILL, EnumSet.of(TaskAttemptEventType.TA_KILL,
TaskAttemptEventType.TA_ASSIGNED, TaskAttemptEventType.TA_ASSIGNED,
TaskAttemptEventType.TA_CONTAINER_COMPLETED, TaskAttemptEventType.TA_CONTAINER_COMPLETED,
@ -445,11 +446,11 @@ public abstract class TaskAttemptImpl implements
TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURE)) TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURE))
// Transitions from KILLED state // Transitions from KILLED state
.addTransition(TaskAttemptState.KILLED, TaskAttemptState.KILLED, .addTransition(TaskAttemptStateInternal.KILLED, TaskAttemptStateInternal.KILLED,
TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION) DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
// Ignore-able events for KILLED state // Ignore-able events for KILLED state
.addTransition(TaskAttemptState.KILLED, TaskAttemptState.KILLED, .addTransition(TaskAttemptStateInternal.KILLED, TaskAttemptStateInternal.KILLED,
EnumSet.of(TaskAttemptEventType.TA_KILL, EnumSet.of(TaskAttemptEventType.TA_KILL,
TaskAttemptEventType.TA_ASSIGNED, TaskAttemptEventType.TA_ASSIGNED,
TaskAttemptEventType.TA_CONTAINER_COMPLETED, TaskAttemptEventType.TA_CONTAINER_COMPLETED,
@ -466,7 +467,7 @@ public abstract class TaskAttemptImpl implements
.installTopology(); .installTopology();
private final StateMachine private final StateMachine
<TaskAttemptState, TaskAttemptEventType, TaskAttemptEvent> <TaskAttemptStateInternal, TaskAttemptEventType, TaskAttemptEvent>
stateMachine; stateMachine;
private ContainerId containerID; private ContainerId containerID;
@ -874,9 +875,9 @@ public abstract class TaskAttemptImpl implements
readLock.lock(); readLock.lock();
try { try {
// TODO: Use stateMachine level method? // TODO: Use stateMachine level method?
return (getState() == TaskAttemptState.SUCCEEDED || return (getInternalState() == TaskAttemptStateInternal.SUCCEEDED ||
getState() == TaskAttemptState.FAILED || getInternalState() == TaskAttemptStateInternal.FAILED ||
getState() == TaskAttemptState.KILLED); getInternalState() == TaskAttemptStateInternal.KILLED);
} finally { } finally {
readLock.unlock(); readLock.unlock();
} }
@ -953,7 +954,7 @@ public abstract class TaskAttemptImpl implements
public TaskAttemptState getState() { public TaskAttemptState getState() {
readLock.lock(); readLock.lock();
try { try {
return stateMachine.getCurrentState(); return getExternalState(stateMachine.getCurrentState());
} finally { } finally {
readLock.unlock(); readLock.unlock();
} }
@ -968,7 +969,7 @@ public abstract class TaskAttemptImpl implements
} }
writeLock.lock(); writeLock.lock();
try { try {
final TaskAttemptState oldState = getState(); final TaskAttemptStateInternal oldState = getInternalState() ;
try { try {
stateMachine.doTransition(event.getType(), event); stateMachine.doTransition(event.getType(), event);
} catch (InvalidStateTransitonException e) { } catch (InvalidStateTransitonException e) {
@ -980,16 +981,58 @@ public abstract class TaskAttemptImpl implements
eventHandler.handle(new JobEvent(this.attemptId.getTaskId().getJobId(), eventHandler.handle(new JobEvent(this.attemptId.getTaskId().getJobId(),
JobEventType.INTERNAL_ERROR)); JobEventType.INTERNAL_ERROR));
} }
if (oldState != getState()) { if (oldState != getInternalState()) {
LOG.info(attemptId + " TaskAttempt Transitioned from " LOG.info(attemptId + " TaskAttempt Transitioned from "
+ oldState + " to " + oldState + " to "
+ getState()); + getInternalState());
} }
} finally { } finally {
writeLock.unlock(); writeLock.unlock();
} }
} }
@VisibleForTesting
public TaskAttemptStateInternal getInternalState() {
readLock.lock();
try {
return stateMachine.getCurrentState();
} finally {
readLock.unlock();
}
}
private static TaskAttemptState getExternalState(
TaskAttemptStateInternal smState) {
switch (smState) {
case ASSIGNED:
case UNASSIGNED:
return TaskAttemptState.STARTING;
case COMMIT_PENDING:
return TaskAttemptState.COMMIT_PENDING;
case FAILED:
return TaskAttemptState.FAILED;
case KILLED:
return TaskAttemptState.KILLED;
// All CLEANUP states considered as RUNNING since events have not gone out
// to the Task yet. May be possible to consider them as a Finished state.
case FAIL_CONTAINER_CLEANUP:
case FAIL_TASK_CLEANUP:
case KILL_CONTAINER_CLEANUP:
case KILL_TASK_CLEANUP:
case SUCCESS_CONTAINER_CLEANUP:
case RUNNING:
return TaskAttemptState.RUNNING;
case NEW:
return TaskAttemptState.NEW;
case SUCCEEDED:
return TaskAttemptState.SUCCEEDED;
default:
throw new YarnException("Attempt to convert invalid "
+ "stateMachineTaskAttemptState to externalTaskAttemptState: "
+ smState);
}
}
//always called in write lock //always called in write lock
private void setFinishTime() { private void setFinishTime() {
//set the finish time only if launch time is set //set the finish time only if launch time is set
@ -1066,7 +1109,7 @@ public abstract class TaskAttemptImpl implements
private static private static
TaskAttemptUnsuccessfulCompletionEvent TaskAttemptUnsuccessfulCompletionEvent
createTaskAttemptUnsuccessfulCompletionEvent(TaskAttemptImpl taskAttempt, createTaskAttemptUnsuccessfulCompletionEvent(TaskAttemptImpl taskAttempt,
TaskAttemptState attemptState) { TaskAttemptStateInternal attemptState) {
TaskAttemptUnsuccessfulCompletionEvent tauce = TaskAttemptUnsuccessfulCompletionEvent tauce =
new TaskAttemptUnsuccessfulCompletionEvent( new TaskAttemptUnsuccessfulCompletionEvent(
TypeConverter.fromYarn(taskAttempt.attemptId), TypeConverter.fromYarn(taskAttempt.attemptId),
@ -1247,10 +1290,10 @@ public abstract class TaskAttemptImpl implements
private static class DeallocateContainerTransition implements private static class DeallocateContainerTransition implements
SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> { SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
private final TaskAttemptState finalState; private final TaskAttemptStateInternal finalState;
private final boolean withdrawsContainerRequest; private final boolean withdrawsContainerRequest;
DeallocateContainerTransition DeallocateContainerTransition
(TaskAttemptState finalState, boolean withdrawsContainerRequest) { (TaskAttemptStateInternal finalState, boolean withdrawsContainerRequest) {
this.finalState = finalState; this.finalState = finalState;
this.withdrawsContainerRequest = withdrawsContainerRequest; this.withdrawsContainerRequest = withdrawsContainerRequest;
} }
@ -1288,10 +1331,10 @@ public abstract class TaskAttemptImpl implements
TaskAttemptUnsuccessfulCompletionEvent tauce = TaskAttemptUnsuccessfulCompletionEvent tauce =
createTaskAttemptUnsuccessfulCompletionEvent(taskAttempt, createTaskAttemptUnsuccessfulCompletionEvent(taskAttempt,
finalState); finalState);
if(finalState == TaskAttemptState.FAILED) { if(finalState == TaskAttemptStateInternal.FAILED) {
taskAttempt.eventHandler taskAttempt.eventHandler
.handle(createJobCounterUpdateEventTAFailed(taskAttempt, false)); .handle(createJobCounterUpdateEventTAFailed(taskAttempt, false));
} else if(finalState == TaskAttemptState.KILLED) { } else if(finalState == TaskAttemptStateInternal.KILLED) {
taskAttempt.eventHandler taskAttempt.eventHandler
.handle(createJobCounterUpdateEventTAKilled(taskAttempt, false)); .handle(createJobCounterUpdateEventTAKilled(taskAttempt, false));
} }
@ -1405,7 +1448,7 @@ public abstract class TaskAttemptImpl implements
JobCounter.SLOTS_MILLIS_MAPS : JobCounter.SLOTS_MILLIS_REDUCES, JobCounter.SLOTS_MILLIS_MAPS : JobCounter.SLOTS_MILLIS_REDUCES,
slotMillis); slotMillis);
taskAttempt.eventHandler.handle(jce); taskAttempt.eventHandler.handle(jce);
taskAttempt.logAttemptFinishedEvent(TaskAttemptState.SUCCEEDED); taskAttempt.logAttemptFinishedEvent(TaskAttemptStateInternal.SUCCEEDED);
taskAttempt.eventHandler.handle(new TaskTAttemptEvent( taskAttempt.eventHandler.handle(new TaskTAttemptEvent(
taskAttempt.attemptId, taskAttempt.attemptId,
TaskEventType.T_ATTEMPT_SUCCEEDED)); TaskEventType.T_ATTEMPT_SUCCEEDED));
@ -1428,10 +1471,10 @@ public abstract class TaskAttemptImpl implements
.handle(createJobCounterUpdateEventTAFailed(taskAttempt, false)); .handle(createJobCounterUpdateEventTAFailed(taskAttempt, false));
TaskAttemptUnsuccessfulCompletionEvent tauce = TaskAttemptUnsuccessfulCompletionEvent tauce =
createTaskAttemptUnsuccessfulCompletionEvent(taskAttempt, createTaskAttemptUnsuccessfulCompletionEvent(taskAttempt,
TaskAttemptState.FAILED); TaskAttemptStateInternal.FAILED);
taskAttempt.eventHandler.handle(new JobHistoryEvent( taskAttempt.eventHandler.handle(new JobHistoryEvent(
taskAttempt.attemptId.getTaskId().getJobId(), tauce)); taskAttempt.attemptId.getTaskId().getJobId(), tauce));
// taskAttempt.logAttemptFinishedEvent(TaskAttemptState.FAILED); Not // taskAttempt.logAttemptFinishedEvent(TaskAttemptStateInternal.FAILED); Not
// handling failed map/reduce events. // handling failed map/reduce events.
}else { }else {
LOG.debug("Not generating HistoryFinish event since start event not " + LOG.debug("Not generating HistoryFinish event since start event not " +
@ -1443,7 +1486,7 @@ public abstract class TaskAttemptImpl implements
} }
@SuppressWarnings({ "unchecked" }) @SuppressWarnings({ "unchecked" })
private void logAttemptFinishedEvent(TaskAttemptState state) { private void logAttemptFinishedEvent(TaskAttemptStateInternal state) {
//Log finished events only if an attempt started. //Log finished events only if an attempt started.
if (getLaunchTime() == 0) return; if (getLaunchTime() == 0) return;
if (attemptId.getTaskId().getTaskType() == TaskType.MAP) { if (attemptId.getTaskId().getTaskType() == TaskType.MAP) {
@ -1500,7 +1543,7 @@ public abstract class TaskAttemptImpl implements
.handle(createJobCounterUpdateEventTAFailed(taskAttempt, true)); .handle(createJobCounterUpdateEventTAFailed(taskAttempt, true));
TaskAttemptUnsuccessfulCompletionEvent tauce = TaskAttemptUnsuccessfulCompletionEvent tauce =
createTaskAttemptUnsuccessfulCompletionEvent(taskAttempt, createTaskAttemptUnsuccessfulCompletionEvent(taskAttempt,
TaskAttemptState.FAILED); TaskAttemptStateInternal.FAILED);
taskAttempt.eventHandler.handle(new JobHistoryEvent( taskAttempt.eventHandler.handle(new JobHistoryEvent(
taskAttempt.attemptId.getTaskId().getJobId(), tauce)); taskAttempt.attemptId.getTaskId().getJobId(), tauce));
}else { }else {
@ -1513,11 +1556,11 @@ public abstract class TaskAttemptImpl implements
} }
private static class KilledAfterSuccessTransition implements private static class KilledAfterSuccessTransition implements
MultipleArcTransition<TaskAttemptImpl, TaskAttemptEvent, TaskAttemptState> { MultipleArcTransition<TaskAttemptImpl, TaskAttemptEvent, TaskAttemptStateInternal> {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@Override @Override
public TaskAttemptState transition(TaskAttemptImpl taskAttempt, public TaskAttemptStateInternal transition(TaskAttemptImpl taskAttempt,
TaskAttemptEvent event) { TaskAttemptEvent event) {
if(taskAttempt.getID().getTaskId().getTaskType() == TaskType.REDUCE) { if(taskAttempt.getID().getTaskId().getTaskType() == TaskType.REDUCE) {
// after a reduce task has succeeded, its outputs are in safe in HDFS. // after a reduce task has succeeded, its outputs are in safe in HDFS.
@ -1530,7 +1573,7 @@ public abstract class TaskAttemptImpl implements
// ignore this for reduce tasks // ignore this for reduce tasks
LOG.info("Ignoring killed event for successful reduce task attempt" + LOG.info("Ignoring killed event for successful reduce task attempt" +
taskAttempt.getID().toString()); taskAttempt.getID().toString());
return TaskAttemptState.SUCCEEDED; return TaskAttemptStateInternal.SUCCEEDED;
} }
if(event instanceof TaskAttemptKillEvent) { if(event instanceof TaskAttemptKillEvent) {
TaskAttemptKillEvent msgEvent = (TaskAttemptKillEvent) event; TaskAttemptKillEvent msgEvent = (TaskAttemptKillEvent) event;
@ -1545,12 +1588,12 @@ public abstract class TaskAttemptImpl implements
taskAttempt.eventHandler taskAttempt.eventHandler
.handle(createJobCounterUpdateEventTAKilled(taskAttempt, true)); .handle(createJobCounterUpdateEventTAKilled(taskAttempt, true));
TaskAttemptUnsuccessfulCompletionEvent tauce = createTaskAttemptUnsuccessfulCompletionEvent( TaskAttemptUnsuccessfulCompletionEvent tauce = createTaskAttemptUnsuccessfulCompletionEvent(
taskAttempt, TaskAttemptState.KILLED); taskAttempt, TaskAttemptStateInternal.KILLED);
taskAttempt.eventHandler.handle(new JobHistoryEvent(taskAttempt.attemptId taskAttempt.eventHandler.handle(new JobHistoryEvent(taskAttempt.attemptId
.getTaskId().getJobId(), tauce)); .getTaskId().getJobId(), tauce));
taskAttempt.eventHandler.handle(new TaskTAttemptEvent( taskAttempt.eventHandler.handle(new TaskTAttemptEvent(
taskAttempt.attemptId, TaskEventType.T_ATTEMPT_KILLED)); taskAttempt.attemptId, TaskEventType.T_ATTEMPT_KILLED));
return TaskAttemptState.KILLED; return TaskAttemptStateInternal.KILLED;
} }
} }
@ -1568,14 +1611,14 @@ public abstract class TaskAttemptImpl implements
.handle(createJobCounterUpdateEventTAKilled(taskAttempt, false)); .handle(createJobCounterUpdateEventTAKilled(taskAttempt, false));
TaskAttemptUnsuccessfulCompletionEvent tauce = TaskAttemptUnsuccessfulCompletionEvent tauce =
createTaskAttemptUnsuccessfulCompletionEvent(taskAttempt, createTaskAttemptUnsuccessfulCompletionEvent(taskAttempt,
TaskAttemptState.KILLED); TaskAttemptStateInternal.KILLED);
taskAttempt.eventHandler.handle(new JobHistoryEvent( taskAttempt.eventHandler.handle(new JobHistoryEvent(
taskAttempt.attemptId.getTaskId().getJobId(), tauce)); taskAttempt.attemptId.getTaskId().getJobId(), tauce));
}else { }else {
LOG.debug("Not generating HistoryFinish event since start event not " + LOG.debug("Not generating HistoryFinish event since start event not " +
"generated for taskAttempt: " + taskAttempt.getID()); "generated for taskAttempt: " + taskAttempt.getID());
} }
// taskAttempt.logAttemptFinishedEvent(TaskAttemptState.KILLED); Not logging Map/Reduce attempts in case of failure. // taskAttempt.logAttemptFinishedEvent(TaskAttemptStateInternal.KILLED); Not logging Map/Reduce attempts in case of failure.
taskAttempt.eventHandler.handle(new TaskTAttemptEvent( taskAttempt.eventHandler.handle(new TaskTAttemptEvent(
taskAttempt.attemptId, taskAttempt.attemptId,
TaskEventType.T_ATTEMPT_KILLED)); TaskEventType.T_ATTEMPT_KILLED));

View File

@ -44,7 +44,6 @@ import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
import org.apache.hadoop.mapreduce.jobhistory.TaskFailedEvent; import org.apache.hadoop.mapreduce.jobhistory.TaskFailedEvent;
import org.apache.hadoop.mapreduce.jobhistory.TaskFinishedEvent; import org.apache.hadoop.mapreduce.jobhistory.TaskFinishedEvent;
import org.apache.hadoop.mapreduce.jobhistory.TaskStartedEvent; import org.apache.hadoop.mapreduce.jobhistory.TaskStartedEvent;
import org.apache.hadoop.security.ssl.SSLFactory;
import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier; import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent;
@ -59,6 +58,7 @@ import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener; import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
import org.apache.hadoop.mapreduce.v2.app.job.Task; import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt; import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
import org.apache.hadoop.mapreduce.v2.app.job.TaskStateInternal;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobDiagnosticsUpdateEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.JobDiagnosticsUpdateEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
@ -85,6 +85,8 @@ import org.apache.hadoop.yarn.state.SingleArcTransition;
import org.apache.hadoop.yarn.state.StateMachine; import org.apache.hadoop.yarn.state.StateMachine;
import org.apache.hadoop.yarn.state.StateMachineFactory; import org.apache.hadoop.yarn.state.StateMachineFactory;
import com.google.common.annotations.VisibleForTesting;
/** /**
* Implementation of Task interface. * Implementation of Task interface.
*/ */
@ -127,62 +129,62 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
KILL_TRANSITION = new KillTransition(); KILL_TRANSITION = new KillTransition();
private static final StateMachineFactory private static final StateMachineFactory
<TaskImpl, TaskState, TaskEventType, TaskEvent> <TaskImpl, TaskStateInternal, TaskEventType, TaskEvent>
stateMachineFactory stateMachineFactory
= new StateMachineFactory<TaskImpl, TaskState, TaskEventType, TaskEvent> = new StateMachineFactory<TaskImpl, TaskStateInternal, TaskEventType, TaskEvent>
(TaskState.NEW) (TaskStateInternal.NEW)
// define the state machine of Task // define the state machine of Task
// Transitions from NEW state // Transitions from NEW state
.addTransition(TaskState.NEW, TaskState.SCHEDULED, .addTransition(TaskStateInternal.NEW, TaskStateInternal.SCHEDULED,
TaskEventType.T_SCHEDULE, new InitialScheduleTransition()) TaskEventType.T_SCHEDULE, new InitialScheduleTransition())
.addTransition(TaskState.NEW, TaskState.KILLED, .addTransition(TaskStateInternal.NEW, TaskStateInternal.KILLED,
TaskEventType.T_KILL, new KillNewTransition()) TaskEventType.T_KILL, new KillNewTransition())
// Transitions from SCHEDULED state // Transitions from SCHEDULED state
//when the first attempt is launched, the task state is set to RUNNING //when the first attempt is launched, the task state is set to RUNNING
.addTransition(TaskState.SCHEDULED, TaskState.RUNNING, .addTransition(TaskStateInternal.SCHEDULED, TaskStateInternal.RUNNING,
TaskEventType.T_ATTEMPT_LAUNCHED, new LaunchTransition()) TaskEventType.T_ATTEMPT_LAUNCHED, new LaunchTransition())
.addTransition(TaskState.SCHEDULED, TaskState.KILL_WAIT, .addTransition(TaskStateInternal.SCHEDULED, TaskStateInternal.KILL_WAIT,
TaskEventType.T_KILL, KILL_TRANSITION) TaskEventType.T_KILL, KILL_TRANSITION)
.addTransition(TaskState.SCHEDULED, TaskState.SCHEDULED, .addTransition(TaskStateInternal.SCHEDULED, TaskStateInternal.SCHEDULED,
TaskEventType.T_ATTEMPT_KILLED, ATTEMPT_KILLED_TRANSITION) TaskEventType.T_ATTEMPT_KILLED, ATTEMPT_KILLED_TRANSITION)
.addTransition(TaskState.SCHEDULED, .addTransition(TaskStateInternal.SCHEDULED,
EnumSet.of(TaskState.SCHEDULED, TaskState.FAILED), EnumSet.of(TaskStateInternal.SCHEDULED, TaskStateInternal.FAILED),
TaskEventType.T_ATTEMPT_FAILED, TaskEventType.T_ATTEMPT_FAILED,
new AttemptFailedTransition()) new AttemptFailedTransition())
// Transitions from RUNNING state // Transitions from RUNNING state
.addTransition(TaskState.RUNNING, TaskState.RUNNING, .addTransition(TaskStateInternal.RUNNING, TaskStateInternal.RUNNING,
TaskEventType.T_ATTEMPT_LAUNCHED) //more attempts may start later TaskEventType.T_ATTEMPT_LAUNCHED) //more attempts may start later
.addTransition(TaskState.RUNNING, TaskState.RUNNING, .addTransition(TaskStateInternal.RUNNING, TaskStateInternal.RUNNING,
TaskEventType.T_ATTEMPT_COMMIT_PENDING, TaskEventType.T_ATTEMPT_COMMIT_PENDING,
new AttemptCommitPendingTransition()) new AttemptCommitPendingTransition())
.addTransition(TaskState.RUNNING, TaskState.RUNNING, .addTransition(TaskStateInternal.RUNNING, TaskStateInternal.RUNNING,
TaskEventType.T_ADD_SPEC_ATTEMPT, new RedundantScheduleTransition()) TaskEventType.T_ADD_SPEC_ATTEMPT, new RedundantScheduleTransition())
.addTransition(TaskState.RUNNING, TaskState.SUCCEEDED, .addTransition(TaskStateInternal.RUNNING, TaskStateInternal.SUCCEEDED,
TaskEventType.T_ATTEMPT_SUCCEEDED, TaskEventType.T_ATTEMPT_SUCCEEDED,
new AttemptSucceededTransition()) new AttemptSucceededTransition())
.addTransition(TaskState.RUNNING, TaskState.RUNNING, .addTransition(TaskStateInternal.RUNNING, TaskStateInternal.RUNNING,
TaskEventType.T_ATTEMPT_KILLED, TaskEventType.T_ATTEMPT_KILLED,
ATTEMPT_KILLED_TRANSITION) ATTEMPT_KILLED_TRANSITION)
.addTransition(TaskState.RUNNING, .addTransition(TaskStateInternal.RUNNING,
EnumSet.of(TaskState.RUNNING, TaskState.FAILED), EnumSet.of(TaskStateInternal.RUNNING, TaskStateInternal.FAILED),
TaskEventType.T_ATTEMPT_FAILED, TaskEventType.T_ATTEMPT_FAILED,
new AttemptFailedTransition()) new AttemptFailedTransition())
.addTransition(TaskState.RUNNING, TaskState.KILL_WAIT, .addTransition(TaskStateInternal.RUNNING, TaskStateInternal.KILL_WAIT,
TaskEventType.T_KILL, KILL_TRANSITION) TaskEventType.T_KILL, KILL_TRANSITION)
// Transitions from KILL_WAIT state // Transitions from KILL_WAIT state
.addTransition(TaskState.KILL_WAIT, .addTransition(TaskStateInternal.KILL_WAIT,
EnumSet.of(TaskState.KILL_WAIT, TaskState.KILLED), EnumSet.of(TaskStateInternal.KILL_WAIT, TaskStateInternal.KILLED),
TaskEventType.T_ATTEMPT_KILLED, TaskEventType.T_ATTEMPT_KILLED,
new KillWaitAttemptKilledTransition()) new KillWaitAttemptKilledTransition())
// Ignore-able transitions. // Ignore-able transitions.
.addTransition( .addTransition(
TaskState.KILL_WAIT, TaskStateInternal.KILL_WAIT,
TaskState.KILL_WAIT, TaskStateInternal.KILL_WAIT,
EnumSet.of(TaskEventType.T_KILL, EnumSet.of(TaskEventType.T_KILL,
TaskEventType.T_ATTEMPT_LAUNCHED, TaskEventType.T_ATTEMPT_LAUNCHED,
TaskEventType.T_ATTEMPT_COMMIT_PENDING, TaskEventType.T_ATTEMPT_COMMIT_PENDING,
@ -191,32 +193,32 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
TaskEventType.T_ADD_SPEC_ATTEMPT)) TaskEventType.T_ADD_SPEC_ATTEMPT))
// Transitions from SUCCEEDED state // Transitions from SUCCEEDED state
.addTransition(TaskState.SUCCEEDED, .addTransition(TaskStateInternal.SUCCEEDED,
EnumSet.of(TaskState.SCHEDULED, TaskState.SUCCEEDED, TaskState.FAILED), EnumSet.of(TaskStateInternal.SCHEDULED, TaskStateInternal.SUCCEEDED, TaskStateInternal.FAILED),
TaskEventType.T_ATTEMPT_FAILED, new RetroactiveFailureTransition()) TaskEventType.T_ATTEMPT_FAILED, new RetroactiveFailureTransition())
.addTransition(TaskState.SUCCEEDED, .addTransition(TaskStateInternal.SUCCEEDED,
EnumSet.of(TaskState.SCHEDULED, TaskState.SUCCEEDED), EnumSet.of(TaskStateInternal.SCHEDULED, TaskStateInternal.SUCCEEDED),
TaskEventType.T_ATTEMPT_KILLED, new RetroactiveKilledTransition()) TaskEventType.T_ATTEMPT_KILLED, new RetroactiveKilledTransition())
// Ignore-able transitions. // Ignore-able transitions.
.addTransition( .addTransition(
TaskState.SUCCEEDED, TaskState.SUCCEEDED, TaskStateInternal.SUCCEEDED, TaskStateInternal.SUCCEEDED,
EnumSet.of(TaskEventType.T_ADD_SPEC_ATTEMPT, EnumSet.of(TaskEventType.T_ADD_SPEC_ATTEMPT,
TaskEventType.T_ATTEMPT_LAUNCHED)) TaskEventType.T_ATTEMPT_LAUNCHED))
// Transitions from FAILED state // Transitions from FAILED state
.addTransition(TaskState.FAILED, TaskState.FAILED, .addTransition(TaskStateInternal.FAILED, TaskStateInternal.FAILED,
EnumSet.of(TaskEventType.T_KILL, EnumSet.of(TaskEventType.T_KILL,
TaskEventType.T_ADD_SPEC_ATTEMPT)) TaskEventType.T_ADD_SPEC_ATTEMPT))
// Transitions from KILLED state // Transitions from KILLED state
.addTransition(TaskState.KILLED, TaskState.KILLED, .addTransition(TaskStateInternal.KILLED, TaskStateInternal.KILLED,
EnumSet.of(TaskEventType.T_KILL, EnumSet.of(TaskEventType.T_KILL,
TaskEventType.T_ADD_SPEC_ATTEMPT)) TaskEventType.T_ADD_SPEC_ATTEMPT))
// create the topology tables // create the topology tables
.installTopology(); .installTopology();
private final StateMachine<TaskState, TaskEventType, TaskEvent> private final StateMachine<TaskStateInternal, TaskEventType, TaskEvent>
stateMachine; stateMachine;
// By default, the next TaskAttempt number is zero. Changes during recovery // By default, the next TaskAttempt number is zero. Changes during recovery
@ -247,7 +249,12 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
@Override @Override
public TaskState getState() { public TaskState getState() {
return stateMachine.getCurrentState(); readLock.lock();
try {
return getExternalState(getInternalState());
} finally {
readLock.unlock();
}
} }
public TaskImpl(JobId jobId, TaskType taskType, int partition, public TaskImpl(JobId jobId, TaskType taskType, int partition,
@ -356,9 +363,9 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
readLock.lock(); readLock.lock();
try { try {
// TODO: Use stateMachine level method? // TODO: Use stateMachine level method?
return (getState() == TaskState.SUCCEEDED || return (getInternalState() == TaskStateInternal.SUCCEEDED ||
getState() == TaskState.FAILED || getInternalState() == TaskStateInternal.FAILED ||
getState() == TaskState.KILLED); getInternalState() == TaskStateInternal.KILLED);
} finally { } finally {
readLock.unlock(); readLock.unlock();
} }
@ -433,6 +440,24 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
} }
} }
@VisibleForTesting
public TaskStateInternal getInternalState() {
readLock.lock();
try {
return stateMachine.getCurrentState();
} finally {
readLock.unlock();
}
}
private static TaskState getExternalState(TaskStateInternal smState) {
if (smState == TaskStateInternal.KILL_WAIT) {
return TaskState.KILLED;
} else {
return TaskState.valueOf(smState.name());
}
}
//this is always called in read/write lock //this is always called in read/write lock
private long getLaunchTime() { private long getLaunchTime() {
long taskLaunchTime = 0; long taskLaunchTime = 0;
@ -484,8 +509,8 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
return finishTime; return finishTime;
} }
private TaskState finished(TaskState finalState) { private TaskStateInternal finished(TaskStateInternal finalState) {
if (getState() == TaskState.RUNNING) { if (getInternalState() == TaskStateInternal.RUNNING) {
metrics.endRunningTask(this); metrics.endRunningTask(this);
} }
return finalState; return finalState;
@ -500,11 +525,7 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
switch (at.getState()) { switch (at.getState()) {
// ignore all failed task attempts // ignore all failed task attempts
case FAIL_CONTAINER_CLEANUP:
case FAIL_TASK_CLEANUP:
case FAILED: case FAILED:
case KILL_CONTAINER_CLEANUP:
case KILL_TASK_CLEANUP:
case KILLED: case KILLED:
continue; continue;
} }
@ -605,7 +626,7 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
} }
try { try {
writeLock.lock(); writeLock.lock();
TaskState oldState = getState(); TaskStateInternal oldState = getInternalState();
try { try {
stateMachine.doTransition(event.getType(), event); stateMachine.doTransition(event.getType(), event);
} catch (InvalidStateTransitonException e) { } catch (InvalidStateTransitonException e) {
@ -613,9 +634,9 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
+ this.taskId, e); + this.taskId, e);
internalError(event.getType()); internalError(event.getType());
} }
if (oldState != getState()) { if (oldState != getInternalState()) {
LOG.info(taskId + " Task Transitioned from " + oldState + " to " LOG.info(taskId + " Task Transitioned from " + oldState + " to "
+ getState()); + getInternalState());
} }
} finally { } finally {
@ -659,7 +680,7 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
} }
} }
private static TaskFinishedEvent createTaskFinishedEvent(TaskImpl task, TaskState taskState) { private static TaskFinishedEvent createTaskFinishedEvent(TaskImpl task, TaskStateInternal taskState) {
TaskFinishedEvent tfe = TaskFinishedEvent tfe =
new TaskFinishedEvent(TypeConverter.fromYarn(task.taskId), new TaskFinishedEvent(TypeConverter.fromYarn(task.taskId),
TypeConverter.fromYarn(task.successfulAttempt), TypeConverter.fromYarn(task.successfulAttempt),
@ -670,7 +691,7 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
return tfe; return tfe;
} }
private static TaskFailedEvent createTaskFailedEvent(TaskImpl task, List<String> diag, TaskState taskState, TaskAttemptId taId) { private static TaskFailedEvent createTaskFailedEvent(TaskImpl task, List<String> diag, TaskStateInternal taskState, TaskAttemptId taId) {
StringBuilder errorSb = new StringBuilder(); StringBuilder errorSb = new StringBuilder();
if (diag != null) { if (diag != null) {
for (String d : diag) { for (String d : diag) {
@ -775,7 +796,7 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
// issue kill to all other attempts // issue kill to all other attempts
if (task.historyTaskStartGenerated) { if (task.historyTaskStartGenerated) {
TaskFinishedEvent tfe = createTaskFinishedEvent(task, TaskFinishedEvent tfe = createTaskFinishedEvent(task,
TaskState.SUCCEEDED); TaskStateInternal.SUCCEEDED);
task.eventHandler.handle(new JobHistoryEvent(task.taskId.getJobId(), task.eventHandler.handle(new JobHistoryEvent(task.taskId.getJobId(),
tfe)); tfe));
} }
@ -791,7 +812,7 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
TaskAttemptEventType.TA_KILL)); TaskAttemptEventType.TA_KILL));
} }
} }
task.finished(TaskState.SUCCEEDED); task.finished(TaskStateInternal.SUCCEEDED);
} }
} }
@ -812,12 +833,12 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
private static class KillWaitAttemptKilledTransition implements private static class KillWaitAttemptKilledTransition implements
MultipleArcTransition<TaskImpl, TaskEvent, TaskState> { MultipleArcTransition<TaskImpl, TaskEvent, TaskStateInternal> {
protected TaskState finalState = TaskState.KILLED; protected TaskStateInternal finalState = TaskStateInternal.KILLED;
@Override @Override
public TaskState transition(TaskImpl task, TaskEvent event) { public TaskStateInternal transition(TaskImpl task, TaskEvent event) {
task.handleTaskAttemptCompletion( task.handleTaskAttemptCompletion(
((TaskTAttemptEvent) event).getTaskAttemptID(), ((TaskTAttemptEvent) event).getTaskAttemptID(),
TaskAttemptCompletionEventStatus.KILLED); TaskAttemptCompletionEventStatus.KILLED);
@ -835,18 +856,18 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
} }
task.eventHandler.handle( task.eventHandler.handle(
new JobTaskEvent(task.taskId, finalState)); new JobTaskEvent(task.taskId, getExternalState(finalState)));
return finalState; return finalState;
} }
return task.getState(); return task.getInternalState();
} }
} }
private static class AttemptFailedTransition implements private static class AttemptFailedTransition implements
MultipleArcTransition<TaskImpl, TaskEvent, TaskState> { MultipleArcTransition<TaskImpl, TaskEvent, TaskStateInternal> {
@Override @Override
public TaskState transition(TaskImpl task, TaskEvent event) { public TaskStateInternal transition(TaskImpl task, TaskEvent event) {
task.failedAttempts++; task.failedAttempts++;
TaskTAttemptEvent castEvent = (TaskTAttemptEvent) event; TaskTAttemptEvent castEvent = (TaskTAttemptEvent) event;
if (castEvent.getTaskAttemptID().equals(task.commitAttempt)) { if (castEvent.getTaskAttemptID().equals(task.commitAttempt)) {
@ -878,7 +899,7 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
if (task.historyTaskStartGenerated) { if (task.historyTaskStartGenerated) {
TaskFailedEvent taskFailedEvent = createTaskFailedEvent(task, attempt.getDiagnostics(), TaskFailedEvent taskFailedEvent = createTaskFailedEvent(task, attempt.getDiagnostics(),
TaskState.FAILED, taId); TaskStateInternal.FAILED, taId);
task.eventHandler.handle(new JobHistoryEvent(task.taskId.getJobId(), task.eventHandler.handle(new JobHistoryEvent(task.taskId.getJobId(),
taskFailedEvent)); taskFailedEvent));
} else { } else {
@ -887,13 +908,13 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
} }
task.eventHandler.handle( task.eventHandler.handle(
new JobTaskEvent(task.taskId, TaskState.FAILED)); new JobTaskEvent(task.taskId, TaskState.FAILED));
return task.finished(TaskState.FAILED); return task.finished(TaskStateInternal.FAILED);
} }
return getDefaultState(task); return getDefaultState(task);
} }
protected TaskState getDefaultState(Task task) { protected TaskStateInternal getDefaultState(TaskImpl task) {
return task.getState(); return task.getInternalState();
} }
} }
@ -901,14 +922,14 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
extends AttemptFailedTransition { extends AttemptFailedTransition {
@Override @Override
public TaskState transition(TaskImpl task, TaskEvent event) { public TaskStateInternal transition(TaskImpl task, TaskEvent event) {
if (event instanceof TaskTAttemptEvent) { if (event instanceof TaskTAttemptEvent) {
TaskTAttemptEvent castEvent = (TaskTAttemptEvent) event; TaskTAttemptEvent castEvent = (TaskTAttemptEvent) event;
if (task.getState() == TaskState.SUCCEEDED && if (task.getInternalState() == TaskStateInternal.SUCCEEDED &&
!castEvent.getTaskAttemptID().equals(task.successfulAttempt)) { !castEvent.getTaskAttemptID().equals(task.successfulAttempt)) {
// don't allow a different task attempt to override a previous // don't allow a different task attempt to override a previous
// succeeded state // succeeded state
return TaskState.SUCCEEDED; return TaskStateInternal.SUCCEEDED;
} }
} }
@ -933,25 +954,25 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
} }
@Override @Override
protected TaskState getDefaultState(Task task) { protected TaskStateInternal getDefaultState(TaskImpl task) {
return TaskState.SCHEDULED; return TaskStateInternal.SCHEDULED;
} }
} }
private static class RetroactiveKilledTransition implements private static class RetroactiveKilledTransition implements
MultipleArcTransition<TaskImpl, TaskEvent, TaskState> { MultipleArcTransition<TaskImpl, TaskEvent, TaskStateInternal> {
@Override @Override
public TaskState transition(TaskImpl task, TaskEvent event) { public TaskStateInternal transition(TaskImpl task, TaskEvent event) {
TaskAttemptId attemptId = null; TaskAttemptId attemptId = null;
if (event instanceof TaskTAttemptEvent) { if (event instanceof TaskTAttemptEvent) {
TaskTAttemptEvent castEvent = (TaskTAttemptEvent) event; TaskTAttemptEvent castEvent = (TaskTAttemptEvent) event;
attemptId = castEvent.getTaskAttemptID(); attemptId = castEvent.getTaskAttemptID();
if (task.getState() == TaskState.SUCCEEDED && if (task.getInternalState() == TaskStateInternal.SUCCEEDED &&
!attemptId.equals(task.successfulAttempt)) { !attemptId.equals(task.successfulAttempt)) {
// don't allow a different task attempt to override a previous // don't allow a different task attempt to override a previous
// succeeded state // succeeded state
return TaskState.SUCCEEDED; return TaskStateInternal.SUCCEEDED;
} }
} }
@ -977,7 +998,7 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
// to the RM. But the RM would ignore that just like it would ignore // to the RM. But the RM would ignore that just like it would ignore
// currently pending container requests affinitized to bad nodes. // currently pending container requests affinitized to bad nodes.
task.addAndScheduleAttempt(); task.addAndScheduleAttempt();
return TaskState.SCHEDULED; return TaskStateInternal.SCHEDULED;
} }
} }
@ -988,7 +1009,7 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
if (task.historyTaskStartGenerated) { if (task.historyTaskStartGenerated) {
TaskFailedEvent taskFailedEvent = createTaskFailedEvent(task, null, TaskFailedEvent taskFailedEvent = createTaskFailedEvent(task, null,
TaskState.KILLED, null); // TODO Verify failedAttemptId is null TaskStateInternal.KILLED, null); // TODO Verify failedAttemptId is null
task.eventHandler.handle(new JobHistoryEvent(task.taskId.getJobId(), task.eventHandler.handle(new JobHistoryEvent(task.taskId.getJobId(),
taskFailedEvent)); taskFailedEvent));
}else { }else {
@ -996,8 +1017,8 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
" generated for task: " + task.getID()); " generated for task: " + task.getID());
} }
task.eventHandler.handle( task.eventHandler.handle(new JobTaskEvent(task.taskId,
new JobTaskEvent(task.taskId, TaskState.KILLED)); getExternalState(TaskStateInternal.KILLED)));
task.metrics.endWaitingTask(task); task.metrics.endWaitingTask(task);
} }
} }

View File

@ -31,10 +31,11 @@ import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.app.AppContext; import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.client.ClientService; import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
import org.apache.hadoop.mapreduce.v2.app.job.Job; import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils; import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
@ -163,13 +164,14 @@ public abstract class RMCommunicator extends AbstractService {
protected void unregister() { protected void unregister() {
try { try {
FinalApplicationStatus finishState = FinalApplicationStatus.UNDEFINED; FinalApplicationStatus finishState = FinalApplicationStatus.UNDEFINED;
if (job.getState() == JobState.SUCCEEDED) { JobImpl jobImpl = (JobImpl)job;
if (jobImpl.getInternalState() == JobStateInternal.SUCCEEDED) {
finishState = FinalApplicationStatus.SUCCEEDED; finishState = FinalApplicationStatus.SUCCEEDED;
} else if (job.getState() == JobState.KILLED } else if (jobImpl.getInternalState() == JobStateInternal.KILLED
|| (job.getState() == JobState.RUNNING && isSignalled)) { || (jobImpl.getInternalState() == JobStateInternal.RUNNING && isSignalled)) {
finishState = FinalApplicationStatus.KILLED; finishState = FinalApplicationStatus.KILLED;
} else if (job.getState() == JobState.FAILED } else if (jobImpl.getInternalState() == JobStateInternal.FAILED
|| job.getState() == JobState.ERROR) { || jobImpl.getInternalState() == JobStateInternal.ERROR) {
finishState = FinalApplicationStatus.FAILED; finishState = FinalApplicationStatus.FAILED;
} }
StringBuffer sb = new StringBuffer(); StringBuffer sb = new StringBuffer();

View File

@ -365,7 +365,7 @@ public class DefaultSpeculator extends AbstractService implements
for (TaskAttempt taskAttempt : attempts.values()) { for (TaskAttempt taskAttempt : attempts.values()) {
if (taskAttempt.getState() == TaskAttemptState.RUNNING if (taskAttempt.getState() == TaskAttemptState.RUNNING
|| taskAttempt.getState() == TaskAttemptState.ASSIGNED) { || taskAttempt.getState() == TaskAttemptState.STARTING) {
if (++numberRunningAttempts > 1) { if (++numberRunningAttempts > 1) {
return ALREADY_SPECULATING; return ALREADY_SPECULATING;
} }

View File

@ -50,8 +50,10 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskReport;
import org.apache.hadoop.mapreduce.v2.api.records.TaskState; import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
import org.apache.hadoop.mapreduce.v2.app.client.ClientService; import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
import org.apache.hadoop.mapreduce.v2.app.job.Job; import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
import org.apache.hadoop.mapreduce.v2.app.job.Task; import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt; import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttemptStateInternal;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobFinishEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.JobFinishEvent;
@ -60,6 +62,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerLaunched
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl; import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
import org.apache.hadoop.mapreduce.v2.app.job.impl.TaskAttemptImpl;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher; import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent; import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator; import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
@ -240,6 +243,24 @@ public class MRApp extends MRAppMaster {
return job; return job;
} }
public void waitForInternalState(TaskAttemptImpl attempt,
TaskAttemptStateInternal finalState) throws Exception {
int timeoutSecs = 0;
TaskAttemptReport report = attempt.getReport();
TaskAttemptStateInternal iState = attempt.getInternalState();
while (!finalState.equals(iState) && timeoutSecs++ < 20) {
System.out.println("TaskAttempt Internal State is : " + iState
+ " Waiting for Internal state : " + finalState + " progress : "
+ report.getProgress());
Thread.sleep(500);
report = attempt.getReport();
iState = attempt.getInternalState();
}
System.out.println("TaskAttempt Internal State is : " + iState);
Assert.assertEquals("TaskAttempt Internal state is not correct (timedout)",
finalState, iState);
}
public void waitForState(TaskAttempt attempt, public void waitForState(TaskAttempt attempt,
TaskAttemptState finalState) throws Exception { TaskAttemptState finalState) throws Exception {
int timeoutSecs = 0; int timeoutSecs = 0;
@ -501,18 +522,18 @@ public class MRApp extends MRAppMaster {
//override the init transition //override the init transition
private final TestInitTransition initTransition = new TestInitTransition( private final TestInitTransition initTransition = new TestInitTransition(
maps, reduces); maps, reduces);
StateMachineFactory<JobImpl, JobState, JobEventType, JobEvent> localFactory StateMachineFactory<JobImpl, JobStateInternal, JobEventType, JobEvent> localFactory
= stateMachineFactory.addTransition(JobState.NEW, = stateMachineFactory.addTransition(JobStateInternal.NEW,
EnumSet.of(JobState.INITED, JobState.FAILED), EnumSet.of(JobStateInternal.INITED, JobStateInternal.FAILED),
JobEventType.JOB_INIT, JobEventType.JOB_INIT,
// This is abusive. // This is abusive.
initTransition); initTransition);
private final StateMachine<JobState, JobEventType, JobEvent> private final StateMachine<JobStateInternal, JobEventType, JobEvent>
localStateMachine; localStateMachine;
@Override @Override
protected StateMachine<JobState, JobEventType, JobEvent> getStateMachine() { protected StateMachine<JobStateInternal, JobEventType, JobEvent> getStateMachine() {
return localStateMachine; return localStateMachine;
} }

View File

@ -36,8 +36,10 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
import org.apache.hadoop.mapreduce.v2.app.job.Job; import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.Task; import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt; import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttemptStateInternal;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.app.job.impl.TaskAttemptImpl;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher; import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent; import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherImpl; import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherImpl;
@ -190,7 +192,8 @@ public class TestFail {
Assert.assertEquals("Num attempts is not correct", maxAttempts, attempts Assert.assertEquals("Num attempts is not correct", maxAttempts, attempts
.size()); .size());
TaskAttempt attempt = attempts.values().iterator().next(); TaskAttempt attempt = attempts.values().iterator().next();
app.waitForState(attempt, TaskAttemptState.ASSIGNED); app.waitForInternalState((TaskAttemptImpl) attempt,
TaskAttemptStateInternal.ASSIGNED);
app.getDispatcher().getEventHandler().handle( app.getDispatcher().getEventHandler().handle(
new TaskAttemptEvent(attempt.getID(), new TaskAttemptEvent(attempt.getID(),
TaskAttemptEventType.TA_CONTAINER_COMPLETED)); TaskAttemptEventType.TA_CONTAINER_COMPLETED));

View File

@ -48,7 +48,6 @@ import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobState; import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId; import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskState; import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType; import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
@ -56,11 +55,13 @@ import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
import org.apache.hadoop.mapreduce.v2.app.job.Job; import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.Task; import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt; import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttemptStateInternal;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobUpdatedNodesEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.JobUpdatedNodesEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptKillEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptKillEvent;
import org.apache.hadoop.mapreduce.v2.app.job.impl.TaskAttemptImpl;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator; import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerFailedEvent; import org.apache.hadoop.mapreduce.v2.app.rm.ContainerFailedEvent;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerRequestEvent; import org.apache.hadoop.mapreduce.v2.app.rm.ContainerRequestEvent;
@ -411,8 +412,8 @@ public class TestRMContainerAllocator {
// Wait till all map-attempts request for containers // Wait till all map-attempts request for containers
for (Task t : job.getTasks().values()) { for (Task t : job.getTasks().values()) {
if (t.getType() == TaskType.MAP) { if (t.getType() == TaskType.MAP) {
mrApp.waitForState(t.getAttempts().values().iterator().next(), mrApp.waitForInternalState((TaskAttemptImpl) t.getAttempts().values()
TaskAttemptState.UNASSIGNED); .iterator().next(), TaskAttemptStateInternal.UNASSIGNED);
} }
} }
amDispatcher.await(); amDispatcher.await();
@ -562,8 +563,8 @@ public class TestRMContainerAllocator {
amDispatcher.await(); amDispatcher.await();
// Wait till all map-attempts request for containers // Wait till all map-attempts request for containers
for (Task t : job.getTasks().values()) { for (Task t : job.getTasks().values()) {
mrApp.waitForState(t.getAttempts().values().iterator().next(), mrApp.waitForInternalState((TaskAttemptImpl) t.getAttempts().values()
TaskAttemptState.UNASSIGNED); .iterator().next(), TaskAttemptStateInternal.UNASSIGNED);
} }
amDispatcher.await(); amDispatcher.await();

View File

@ -42,8 +42,8 @@ import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager; import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo; import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId; import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
import org.apache.hadoop.mapreduce.v2.app.job.Task; import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobDiagnosticsUpdateEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.JobDiagnosticsUpdateEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
@ -77,11 +77,11 @@ public class TestJobImpl {
tasks.put(mockTask.getID(), mockTask); tasks.put(mockTask.getID(), mockTask);
mockJob.tasks = tasks; mockJob.tasks = tasks;
when(mockJob.getState()).thenReturn(JobState.ERROR); when(mockJob.getInternalState()).thenReturn(JobStateInternal.ERROR);
JobEvent mockJobEvent = mock(JobEvent.class); JobEvent mockJobEvent = mock(JobEvent.class);
JobState state = trans.transition(mockJob, mockJobEvent); JobStateInternal state = trans.transition(mockJob, mockJobEvent);
Assert.assertEquals("Incorrect state returned from JobNoTasksCompletedTransition", Assert.assertEquals("Incorrect state returned from JobNoTasksCompletedTransition",
JobState.ERROR, state); JobStateInternal.ERROR, state);
} }
@Test @Test
@ -96,9 +96,12 @@ public class TestJobImpl {
when(mockJob.getCommitter()).thenReturn(mockCommitter); when(mockJob.getCommitter()).thenReturn(mockCommitter);
when(mockJob.getEventHandler()).thenReturn(mockEventHandler); when(mockJob.getEventHandler()).thenReturn(mockEventHandler);
when(mockJob.getJobContext()).thenReturn(mockJobContext); when(mockJob.getJobContext()).thenReturn(mockJobContext);
when(mockJob.finished(JobState.KILLED)).thenReturn(JobState.KILLED); when(mockJob.finished(JobStateInternal.KILLED)).thenReturn(
when(mockJob.finished(JobState.FAILED)).thenReturn(JobState.FAILED); JobStateInternal.KILLED);
when(mockJob.finished(JobState.SUCCEEDED)).thenReturn(JobState.SUCCEEDED); when(mockJob.finished(JobStateInternal.FAILED)).thenReturn(
JobStateInternal.FAILED);
when(mockJob.finished(JobStateInternal.SUCCEEDED)).thenReturn(
JobStateInternal.SUCCEEDED);
try { try {
doThrow(new IOException()).when(mockCommitter).commitJob(any(JobContext.class)); doThrow(new IOException()).when(mockCommitter).commitJob(any(JobContext.class));
@ -106,11 +109,11 @@ public class TestJobImpl {
// commitJob stubbed out, so this can't happen // commitJob stubbed out, so this can't happen
} }
doNothing().when(mockEventHandler).handle(any(JobHistoryEvent.class)); doNothing().when(mockEventHandler).handle(any(JobHistoryEvent.class));
JobState jobState = JobImpl.checkJobCompleteSuccess(mockJob); JobStateInternal jobState = JobImpl.checkJobCompleteSuccess(mockJob);
Assert.assertNotNull("checkJobCompleteSuccess incorrectly returns null " + Assert.assertNotNull("checkJobCompleteSuccess incorrectly returns null " +
"for successful job", jobState); "for successful job", jobState);
Assert.assertEquals("checkJobCompleteSuccess returns incorrect state", Assert.assertEquals("checkJobCompleteSuccess returns incorrect state",
JobState.FAILED, jobState); JobStateInternal.FAILED, jobState);
verify(mockJob).abortJob( verify(mockJob).abortJob(
eq(org.apache.hadoop.mapreduce.JobStatus.State.FAILED)); eq(org.apache.hadoop.mapreduce.JobStatus.State.FAILED));
} }
@ -129,7 +132,8 @@ public class TestJobImpl {
when(mockJob.getJobContext()).thenReturn(mockJobContext); when(mockJob.getJobContext()).thenReturn(mockJobContext);
doNothing().when(mockJob).setFinishTime(); doNothing().when(mockJob).setFinishTime();
doNothing().when(mockJob).logJobHistoryFinishedEvent(); doNothing().when(mockJob).logJobHistoryFinishedEvent();
when(mockJob.finished(any(JobState.class))).thenReturn(JobState.SUCCEEDED); when(mockJob.finished(any(JobStateInternal.class))).thenReturn(
JobStateInternal.SUCCEEDED);
try { try {
doNothing().when(mockCommitter).commitJob(any(JobContext.class)); doNothing().when(mockCommitter).commitJob(any(JobContext.class));
@ -141,7 +145,7 @@ public class TestJobImpl {
"for successful job", "for successful job",
JobImpl.checkJobCompleteSuccess(mockJob)); JobImpl.checkJobCompleteSuccess(mockJob));
Assert.assertEquals("checkJobCompleteSuccess returns incorrect state", Assert.assertEquals("checkJobCompleteSuccess returns incorrect state",
JobState.SUCCEEDED, JobImpl.checkJobCompleteSuccess(mockJob)); JobStateInternal.SUCCEEDED, JobImpl.checkJobCompleteSuccess(mockJob));
} }
@Test @Test

View File

@ -26,7 +26,6 @@ import static org.mockito.Mockito.when;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -48,13 +47,13 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType; import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.AppContext; import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener; import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
import org.apache.hadoop.mapreduce.v2.app.job.TaskStateInternal;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics; import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.yarn.Clock; import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.SystemClock; import org.apache.hadoop.yarn.SystemClock;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
@ -338,7 +337,7 @@ public class TestTaskImpl {
* {@link TaskState#KILL_WAIT} * {@link TaskState#KILL_WAIT}
*/ */
private void assertTaskKillWaitState() { private void assertTaskKillWaitState() {
assertEquals(TaskState.KILL_WAIT, mockTask.getState()); assertEquals(TaskStateInternal.KILL_WAIT, mockTask.getInternalState());
} }
/** /**

View File

@ -46,6 +46,8 @@ import org.apache.hadoop.mapreduce.v2.app.MRApp;
import org.apache.hadoop.mapreduce.v2.app.job.Job; import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.Task; import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt; import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttemptStateInternal;
import org.apache.hadoop.mapreduce.v2.app.job.impl.TaskAttemptImpl;
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils; import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.yarn.api.ContainerManager; import org.apache.hadoop.yarn.api.ContainerManager;
@ -260,7 +262,8 @@ public class TestContainerLauncher {
attempts.size()); attempts.size());
TaskAttempt attempt = attempts.values().iterator().next(); TaskAttempt attempt = attempts.values().iterator().next();
app.waitForState(attempt, TaskAttemptState.ASSIGNED); app.waitForInternalState((TaskAttemptImpl) attempt,
TaskAttemptStateInternal.ASSIGNED);
app.waitForState(job, JobState.FAILED); app.waitForState(job, JobState.FAILED);

View File

@ -128,14 +128,26 @@ public class TypeConverter {
return taskId; return taskId;
} }
public static TaskAttemptState toYarn(org.apache.hadoop.mapred.TaskStatus.State state) { public static TaskAttemptState toYarn(
if (state == org.apache.hadoop.mapred.TaskStatus.State.KILLED_UNCLEAN) { org.apache.hadoop.mapred.TaskStatus.State state) {
return TaskAttemptState.KILLED; switch (state) {
} case COMMIT_PENDING:
if (state == org.apache.hadoop.mapred.TaskStatus.State.FAILED_UNCLEAN) { return TaskAttemptState.COMMIT_PENDING;
case FAILED:
case FAILED_UNCLEAN:
return TaskAttemptState.FAILED; return TaskAttemptState.FAILED;
case KILLED:
case KILLED_UNCLEAN:
return TaskAttemptState.KILLED;
case RUNNING:
return TaskAttemptState.RUNNING;
case SUCCEEDED:
return TaskAttemptState.SUCCEEDED;
case UNASSIGNED:
return TaskAttemptState.STARTING;
default:
throw new YarnException("Unrecognized State: " + state);
} }
return TaskAttemptState.valueOf(state.toString());
} }
public static Phase toYarn(org.apache.hadoop.mapred.TaskStatus.Phase phase) { public static Phase toYarn(org.apache.hadoop.mapred.TaskStatus.Phase phase) {
@ -309,7 +321,6 @@ public class TypeConverter {
return org.apache.hadoop.mapred.JobStatus.PREP; return org.apache.hadoop.mapred.JobStatus.PREP;
case RUNNING: case RUNNING:
return org.apache.hadoop.mapred.JobStatus.RUNNING; return org.apache.hadoop.mapred.JobStatus.RUNNING;
case KILL_WAIT:
case KILLED: case KILLED:
return org.apache.hadoop.mapred.JobStatus.KILLED; return org.apache.hadoop.mapred.JobStatus.KILLED;
case SUCCEEDED: case SUCCEEDED:
@ -329,7 +340,6 @@ public class TypeConverter {
return org.apache.hadoop.mapred.TIPStatus.PENDING; return org.apache.hadoop.mapred.TIPStatus.PENDING;
case RUNNING: case RUNNING:
return org.apache.hadoop.mapred.TIPStatus.RUNNING; return org.apache.hadoop.mapred.TIPStatus.RUNNING;
case KILL_WAIT:
case KILLED: case KILLED:
return org.apache.hadoop.mapred.TIPStatus.KILLED; return org.apache.hadoop.mapred.TIPStatus.KILLED;
case SUCCEEDED: case SUCCEEDED:

View File

@ -24,7 +24,6 @@ public enum JobState {
RUNNING, RUNNING,
SUCCEEDED, SUCCEEDED,
FAILED, FAILED,
KILL_WAIT,
KILLED, KILLED,
ERROR ERROR
} }

View File

@ -20,16 +20,10 @@ package org.apache.hadoop.mapreduce.v2.api.records;
public enum TaskAttemptState { public enum TaskAttemptState {
NEW, NEW,
UNASSIGNED, STARTING,
ASSIGNED,
RUNNING, RUNNING,
COMMIT_PENDING, COMMIT_PENDING,
SUCCESS_CONTAINER_CLEANUP, SUCCEEDED,
SUCCEEDED, FAILED,
FAIL_CONTAINER_CLEANUP,
FAIL_TASK_CLEANUP,
FAILED,
KILL_CONTAINER_CLEANUP,
KILL_TASK_CLEANUP,
KILLED KILLED
} }

View File

@ -19,5 +19,5 @@
package org.apache.hadoop.mapreduce.v2.api.records; package org.apache.hadoop.mapreduce.v2.api.records;
public enum TaskState { public enum TaskState {
NEW, SCHEDULED, RUNNING, SUCCEEDED, FAILED, KILL_WAIT, KILLED NEW, SCHEDULED, RUNNING, SUCCEEDED, FAILED, KILLED
} }

View File

@ -49,8 +49,8 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType; import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.yarn.ContainerLogAppender; import org.apache.hadoop.yarn.ContainerLogAppender;
import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType; import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
@ -100,15 +100,10 @@ public class MRApps extends Apps {
public static enum TaskAttemptStateUI { public static enum TaskAttemptStateUI {
NEW( NEW(
new TaskAttemptState[] { TaskAttemptState.NEW, new TaskAttemptState[] { TaskAttemptState.NEW,
TaskAttemptState.UNASSIGNED, TaskAttemptState.ASSIGNED }), TaskAttemptState.STARTING }),
RUNNING( RUNNING(
new TaskAttemptState[] { TaskAttemptState.RUNNING, new TaskAttemptState[] { TaskAttemptState.RUNNING,
TaskAttemptState.COMMIT_PENDING, TaskAttemptState.COMMIT_PENDING }),
TaskAttemptState.SUCCESS_CONTAINER_CLEANUP,
TaskAttemptState.FAIL_CONTAINER_CLEANUP,
TaskAttemptState.FAIL_TASK_CLEANUP,
TaskAttemptState.KILL_CONTAINER_CLEANUP,
TaskAttemptState.KILL_TASK_CLEANUP }),
SUCCESSFUL(new TaskAttemptState[] { TaskAttemptState.SUCCEEDED}), SUCCESSFUL(new TaskAttemptState[] { TaskAttemptState.SUCCEEDED}),
FAILED(new TaskAttemptState[] { TaskAttemptState.FAILED}), FAILED(new TaskAttemptState[] { TaskAttemptState.FAILED}),
KILLED(new TaskAttemptState[] { TaskAttemptState.KILLED}); KILLED(new TaskAttemptState[] { TaskAttemptState.KILLED});

View File

@ -50,8 +50,7 @@ enum TaskStateProto {
TS_RUNNING = 3; TS_RUNNING = 3;
TS_SUCCEEDED = 4; TS_SUCCEEDED = 4;
TS_FAILED = 5; TS_FAILED = 5;
TS_KILL_WAIT = 6; TS_KILLED = 6;
TS_KILLED = 7;
} }
enum PhaseProto { enum PhaseProto {
@ -93,18 +92,12 @@ message TaskReportProto {
enum TaskAttemptStateProto { enum TaskAttemptStateProto {
TA_NEW = 1; TA_NEW = 1;
TA_UNASSIGNED = 2; TA_STARTING = 2;
TA_ASSIGNED = 3; TA_RUNNING = 3;
TA_RUNNING = 4; TA_COMMIT_PENDING = 4;
TA_COMMIT_PENDING = 5; TA_SUCCEEDED = 5;
TA_SUCCESS_CONTAINER_CLEANUP = 6; TA_FAILED = 6;
TA_SUCCEEDED = 7; TA_KILLED = 7;
TA_FAIL_CONTAINER_CLEANUP = 8;
TA_FAIL_TASK_CLEANUP = 9;
TA_FAILED = 10;
TA_KILL_CONTAINER_CLEANUP = 11;
TA_KILL_TASK_CLEANUP = 12;
TA_KILLED = 13;
} }
message TaskAttemptReportProto { message TaskAttemptReportProto {
@ -131,9 +124,8 @@ enum JobStateProto {
J_RUNNING = 3; J_RUNNING = 3;
J_SUCCEEDED = 4; J_SUCCEEDED = 4;
J_FAILED = 5; J_FAILED = 5;
J_KILL_WAIT = 6; J_KILLED = 6;
J_KILLED = 7; J_ERROR = 7;
J_ERROR = 8;
} }
message JobReportProto { message JobReportProto {