Merge -r 1448614:1448615 from trunk to branch. Fixes: MAPREDUCE-4951. Container preemption interpreted as task failure. Contributed by Sandy Ryza.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1448618 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
a9f552cb33
commit
96b3905afd
|
@ -28,6 +28,9 @@ Release 2.0.4-beta - UNRELEASED
|
||||||
MAPREDUCE-5013. mapred.JobStatus compatibility: MR2 missing constructors
|
MAPREDUCE-5013. mapred.JobStatus compatibility: MR2 missing constructors
|
||||||
from MR1. (Sandy Ryza via tomwhite)
|
from MR1. (Sandy Ryza via tomwhite)
|
||||||
|
|
||||||
|
MAPREDUCE-4951. Container preemption interpreted as task failure.
|
||||||
|
(Sandy Ryza via tomwhite)
|
||||||
|
|
||||||
Release 2.0.3-alpha - 2013-02-06
|
Release 2.0.3-alpha - 2013-02-06
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -238,7 +238,6 @@ public abstract class TaskAttemptImpl implements
|
||||||
TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP,
|
TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP,
|
||||||
TaskAttemptEventType.TA_CONTAINER_COMPLETED,
|
TaskAttemptEventType.TA_CONTAINER_COMPLETED,
|
||||||
CLEANUP_CONTAINER_TRANSITION)
|
CLEANUP_CONTAINER_TRANSITION)
|
||||||
// ^ If RM kills the container due to expiry, preemption etc.
|
|
||||||
.addTransition(TaskAttemptStateInternal.ASSIGNED,
|
.addTransition(TaskAttemptStateInternal.ASSIGNED,
|
||||||
TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP,
|
TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP,
|
||||||
TaskAttemptEventType.TA_KILL, CLEANUP_CONTAINER_TRANSITION)
|
TaskAttemptEventType.TA_KILL, CLEANUP_CONTAINER_TRANSITION)
|
||||||
|
|
|
@ -67,9 +67,12 @@ import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
import org.apache.hadoop.yarn.api.records.NodeReport;
|
import org.apache.hadoop.yarn.api.records.NodeReport;
|
||||||
import org.apache.hadoop.yarn.api.records.NodeState;
|
import org.apache.hadoop.yarn.api.records.NodeState;
|
||||||
import org.apache.hadoop.yarn.api.records.Priority;
|
import org.apache.hadoop.yarn.api.records.Priority;
|
||||||
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||||
import org.apache.hadoop.yarn.util.RackResolver;
|
import org.apache.hadoop.yarn.util.RackResolver;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Allocates the container from the ResourceManager scheduler.
|
* Allocates the container from the ResourceManager scheduler.
|
||||||
*/
|
*/
|
||||||
|
@ -606,8 +609,8 @@ public class RMContainerAllocator extends RMContainerRequestor
|
||||||
assignedRequests.remove(attemptID);
|
assignedRequests.remove(attemptID);
|
||||||
|
|
||||||
// send the container completed event to Task attempt
|
// send the container completed event to Task attempt
|
||||||
eventHandler.handle(new TaskAttemptEvent(attemptID,
|
eventHandler.handle(createContainerFinishedEvent(cont, attemptID));
|
||||||
TaskAttemptEventType.TA_CONTAINER_COMPLETED));
|
|
||||||
// Send the diagnostics
|
// Send the diagnostics
|
||||||
String diagnostics = StringInterner.weakIntern(cont.getDiagnostics());
|
String diagnostics = StringInterner.weakIntern(cont.getDiagnostics());
|
||||||
eventHandler.handle(new TaskAttemptDiagnosticsUpdateEvent(attemptID,
|
eventHandler.handle(new TaskAttemptDiagnosticsUpdateEvent(attemptID,
|
||||||
|
@ -617,6 +620,19 @@ public class RMContainerAllocator extends RMContainerRequestor
|
||||||
return newContainers;
|
return newContainers;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public TaskAttemptEvent createContainerFinishedEvent(ContainerStatus cont,
|
||||||
|
TaskAttemptId attemptID) {
|
||||||
|
if (cont.getExitStatus() == YarnConfiguration.ABORTED_CONTAINER_EXIT_STATUS) {
|
||||||
|
// killed by framework
|
||||||
|
return new TaskAttemptEvent(attemptID,
|
||||||
|
TaskAttemptEventType.TA_KILL);
|
||||||
|
} else {
|
||||||
|
return new TaskAttemptEvent(attemptID,
|
||||||
|
TaskAttemptEventType.TA_CONTAINER_COMPLETED);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
private void handleUpdatedNodes(AMResponse response) {
|
private void handleUpdatedNodes(AMResponse response) {
|
||||||
// send event to the job about on updated nodes
|
// send event to the job about on updated nodes
|
||||||
|
|
|
@ -83,6 +83,7 @@ import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||||
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.event.Dispatcher;
|
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||||
import org.apache.hadoop.yarn.event.DrainDispatcher;
|
import org.apache.hadoop.yarn.event.DrainDispatcher;
|
||||||
import org.apache.hadoop.yarn.event.Event;
|
import org.apache.hadoop.yarn.event.Event;
|
||||||
|
@ -1645,6 +1646,32 @@ public class TestRMContainerAllocator {
|
||||||
Assert.assertTrue(callbackCalled.get());
|
Assert.assertTrue(callbackCalled.get());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCompletedContainerEvent() {
|
||||||
|
RMContainerAllocator allocator = new RMContainerAllocator(
|
||||||
|
mock(ClientService.class), mock(AppContext.class));
|
||||||
|
|
||||||
|
TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(
|
||||||
|
MRBuilderUtils.newTaskId(
|
||||||
|
MRBuilderUtils.newJobId(1, 1, 1), 1, TaskType.MAP), 1);
|
||||||
|
ContainerId containerId = BuilderUtils.newContainerId(1, 1, 1, 1);
|
||||||
|
ContainerStatus status = BuilderUtils.newContainerStatus(
|
||||||
|
containerId, ContainerState.RUNNING, "", 0);
|
||||||
|
|
||||||
|
ContainerStatus abortedStatus = BuilderUtils.newContainerStatus(
|
||||||
|
containerId, ContainerState.RUNNING, "",
|
||||||
|
YarnConfiguration.ABORTED_CONTAINER_EXIT_STATUS);
|
||||||
|
|
||||||
|
TaskAttemptEvent event = allocator.createContainerFinishedEvent(status,
|
||||||
|
attemptId);
|
||||||
|
Assert.assertEquals(TaskAttemptEventType.TA_CONTAINER_COMPLETED,
|
||||||
|
event.getType());
|
||||||
|
|
||||||
|
TaskAttemptEvent abortedEvent = allocator.createContainerFinishedEvent(
|
||||||
|
abortedStatus, attemptId);
|
||||||
|
Assert.assertEquals(TaskAttemptEventType.TA_KILL, abortedEvent.getType());
|
||||||
|
}
|
||||||
|
|
||||||
public static void main(String[] args) throws Exception {
|
public static void main(String[] args) throws Exception {
|
||||||
TestRMContainerAllocator t = new TestRMContainerAllocator();
|
TestRMContainerAllocator t = new TestRMContainerAllocator();
|
||||||
t.testSimple();
|
t.testSimple();
|
||||||
|
|
Loading…
Reference in New Issue