Merge -c 1188388 from trunk to branch-0.23 to complete fix for MAPREDUCE-3252.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1188467 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Arun Murthy 2011-10-25 00:41:52 +00:00
parent 21ded50c4d
commit 1527edc627
5 changed files with 42 additions and 24 deletions

View File

@ -1691,6 +1691,9 @@ Release 0.23.0 - Unreleased
MAPREDUCE-3028. Added job-end notification support. (Ravi Prakash via
acmurthy)
MAPREDUCE-3249. Ensure shuffle-port is correctly used duringMR AM recovery.
(vinodkv via acmurthy)
MAPREDUCE-3252. Fix map tasks to not rewrite data an extra time when
map output fits in spill buffer. (todd)

View File

@ -568,7 +568,8 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
//raise the completion event only if the container is assigned
// to nextAttemptNumber
if (attempt.getNodeHttpAddress() != null) {
TaskAttemptCompletionEvent tce = recordFactory.newRecordInstance(TaskAttemptCompletionEvent.class);
TaskAttemptCompletionEvent tce = recordFactory
.newRecordInstance(TaskAttemptCompletionEvent.class);
tce.setEventId(-1);
tce.setMapOutputServerAddress("http://"
+ attempt.getNodeHttpAddress().split(":")[0] + ":"

View File

@ -79,6 +79,7 @@ import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.service.CompositeService;
import org.apache.hadoop.yarn.service.Service;
import org.apache.hadoop.yarn.util.BuilderUtils;
/*
* Recovers the completed tasks from the previous life of Application Master.
@ -313,8 +314,8 @@ public class RecoveryService extends CompositeService implements Recovery {
TaskAttemptId aId = ((ContainerRemoteLaunchEvent) event)
.getTaskAttemptID();
TaskAttemptInfo attInfo = getTaskAttemptInfo(aId);
//TODO need to get the real port number MAPREDUCE-2666
actualHandler.handle(new TaskAttemptContainerLaunchedEvent(aId, -1));
actualHandler.handle(new TaskAttemptContainerLaunchedEvent(aId,
attInfo.getShufflePort()));
// send the status update event
sendStatusUpdateEvent(aId, attInfo);
@ -392,16 +393,15 @@ public class RecoveryService extends CompositeService implements Recovery {
TaskAttemptInfo attemptInfo) {
LOG.info("Sending assigned event to " + yarnAttemptID);
ContainerId cId = attemptInfo.getContainerId();
Container container = recordFactory
.newRecordInstance(Container.class);
container.setId(cId);
container.setNodeId(recordFactory
.newRecordInstance(NodeId.class));
// NodeId can be obtained from TaskAttemptInfo.hostname - but this will
// eventually contain rack info.
container.setContainerToken(null);
container.setNodeHttpAddress(attemptInfo.getTrackerName() + ":" +
attemptInfo.getHttpPort());
String[] splits = attemptInfo.getHostname().split(":");
NodeId nodeId = BuilderUtils.newNodeId(splits[0], Integer
.parseInt(splits[1]));
// Resource/Priority/ApplicationACLs are only needed while launching the
// container on an NM, these are already completed tasks, so setting them
// to null
Container container = BuilderUtils.newContainer(cId, nodeId,
attemptInfo.getTrackerName() + ":" + attemptInfo.getHttpPort(),
null, null, null);
actualHandler.handle(new TaskAttemptContainerAssignedEvent(yarnAttemptID,
container, null));
}

View File

@ -315,15 +315,17 @@ public class MRApp extends MRAppMaster {
}
class MockContainerLauncher implements ContainerLauncher {
//We are running locally so set the shuffle port to -1
int shufflePort = -1;
@Override
public void handle(ContainerLauncherEvent event) {
switch (event.getType()) {
case CONTAINER_REMOTE_LAUNCH:
//We are running locally so set the shuffle port to -1
getContext().getEventHandler().handle(
new TaskAttemptContainerLaunchedEvent(event.getTaskAttemptID(),
-1)
);
shufflePort));
attemptLaunched(event.getTaskAttemptID());
break;
@ -355,13 +357,9 @@ public class MRApp extends MRAppMaster {
ContainerId cId = recordFactory.newRecordInstance(ContainerId.class);
cId.setApplicationAttemptId(getContext().getApplicationAttemptId());
cId.setId(containerCount++);
Container container = recordFactory.newRecordInstance(Container.class);
container.setId(cId);
container.setNodeId(recordFactory.newRecordInstance(NodeId.class));
container.getNodeId().setHost("dummy");
container.getNodeId().setPort(1234);
container.setContainerToken(null);
container.setNodeHttpAddress("localhost:9999");
NodeId nodeId = BuilderUtils.newNodeId("localhost", 1234);
Container container = BuilderUtils.newContainer(cId, nodeId,
"localhost:9999", null, null, null);
getContext().getEventHandler().handle(
new TaskAttemptContainerAssignedEvent(event.getAttemptID(),
container, null));

View File

@ -51,6 +51,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.yarn.event.EventHandler;
import org.junit.Test;
@ -270,6 +271,9 @@ public class TestRecovery {
//wait for map task to complete
app.waitForState(mapTask1, TaskState.SUCCEEDED);
// Verify the shuffle-port
Assert.assertEquals(5467, task1Attempt1.getShufflePort());
app.waitForState(reduceTask1, TaskState.RUNNING);
TaskAttempt reduce1Attempt1 = reduceTask1.getAttempts().values().iterator().next();
@ -290,7 +294,8 @@ public class TestRecovery {
//rerun
//in rerun the map will be recovered from previous run
app = new MRAppWithHistory(1, 2, false, this.getClass().getName(), false, ++runCount);
app = new MRAppWithHistory(1, 2, false, this.getClass().getName(), false,
++runCount);
conf = new Configuration();
conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
conf.setBoolean("mapred.mapper.new-api", true);
@ -309,6 +314,10 @@ public class TestRecovery {
// map will be recovered, no need to send done
app.waitForState(mapTask1, TaskState.SUCCEEDED);
// Verify the shuffle-port after recovery
task1Attempt1 = mapTask1.getAttempts().values().iterator().next();
Assert.assertEquals(5467, task1Attempt1.getShufflePort());
// first reduce will be recovered, no need to send done
app.waitForState(reduceTask1, TaskState.SUCCEEDED);
@ -397,6 +406,13 @@ public class TestRecovery {
super(maps, reduces, autoComplete, testName, cleanOnStart, startCount);
}
@Override
protected ContainerLauncher createContainerLauncher(AppContext context) {
MockContainerLauncher launcher = new MockContainerLauncher();
launcher.shufflePort = 5467;
return launcher;
}
@Override
protected EventHandler<JobHistoryEvent> createJobHistoryHandler(
AppContext context) {