MAPREDUCE-3249. Ensure shuffle-port is correctly used duringMR AM recovery. Contributed by Vinod K V.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1188388 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Arun Murthy 2011-10-24 21:19:31 +00:00
parent 6288dfa873
commit a503755990
5 changed files with 42 additions and 24 deletions

View File

@ -1747,6 +1747,9 @@ Release 0.23.0 - Unreleased
MAPREDUCE-3028. Added job-end notification support. (Ravi Prakash via MAPREDUCE-3028. Added job-end notification support. (Ravi Prakash via
acmurthy) acmurthy)
MAPREDUCE-3249. Ensure shuffle-port is correctly used duringMR AM recovery.
(vinodkv via acmurthy)
Release 0.22.0 - Unreleased Release 0.22.0 - Unreleased
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -568,7 +568,8 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
//raise the completion event only if the container is assigned //raise the completion event only if the container is assigned
// to nextAttemptNumber // to nextAttemptNumber
if (attempt.getNodeHttpAddress() != null) { if (attempt.getNodeHttpAddress() != null) {
TaskAttemptCompletionEvent tce = recordFactory.newRecordInstance(TaskAttemptCompletionEvent.class); TaskAttemptCompletionEvent tce = recordFactory
.newRecordInstance(TaskAttemptCompletionEvent.class);
tce.setEventId(-1); tce.setEventId(-1);
tce.setMapOutputServerAddress("http://" tce.setMapOutputServerAddress("http://"
+ attempt.getNodeHttpAddress().split(":")[0] + ":" + attempt.getNodeHttpAddress().split(":")[0] + ":"

View File

@ -79,6 +79,7 @@ import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.service.CompositeService; import org.apache.hadoop.yarn.service.CompositeService;
import org.apache.hadoop.yarn.service.Service; import org.apache.hadoop.yarn.service.Service;
import org.apache.hadoop.yarn.util.BuilderUtils;
/* /*
* Recovers the completed tasks from the previous life of Application Master. * Recovers the completed tasks from the previous life of Application Master.
@ -313,8 +314,8 @@ public class RecoveryService extends CompositeService implements Recovery {
TaskAttemptId aId = ((ContainerRemoteLaunchEvent) event) TaskAttemptId aId = ((ContainerRemoteLaunchEvent) event)
.getTaskAttemptID(); .getTaskAttemptID();
TaskAttemptInfo attInfo = getTaskAttemptInfo(aId); TaskAttemptInfo attInfo = getTaskAttemptInfo(aId);
//TODO need to get the real port number MAPREDUCE-2666 actualHandler.handle(new TaskAttemptContainerLaunchedEvent(aId,
actualHandler.handle(new TaskAttemptContainerLaunchedEvent(aId, -1)); attInfo.getShufflePort()));
// send the status update event // send the status update event
sendStatusUpdateEvent(aId, attInfo); sendStatusUpdateEvent(aId, attInfo);
@ -392,16 +393,15 @@ public class RecoveryService extends CompositeService implements Recovery {
TaskAttemptInfo attemptInfo) { TaskAttemptInfo attemptInfo) {
LOG.info("Sending assigned event to " + yarnAttemptID); LOG.info("Sending assigned event to " + yarnAttemptID);
ContainerId cId = attemptInfo.getContainerId(); ContainerId cId = attemptInfo.getContainerId();
Container container = recordFactory String[] splits = attemptInfo.getHostname().split(":");
.newRecordInstance(Container.class); NodeId nodeId = BuilderUtils.newNodeId(splits[0], Integer
container.setId(cId); .parseInt(splits[1]));
container.setNodeId(recordFactory // Resource/Priority/ApplicationACLs are only needed while launching the
.newRecordInstance(NodeId.class)); // container on an NM, these are already completed tasks, so setting them
// NodeId can be obtained from TaskAttemptInfo.hostname - but this will // to null
// eventually contain rack info. Container container = BuilderUtils.newContainer(cId, nodeId,
container.setContainerToken(null); attemptInfo.getTrackerName() + ":" + attemptInfo.getHttpPort(),
container.setNodeHttpAddress(attemptInfo.getTrackerName() + ":" + null, null, null);
attemptInfo.getHttpPort());
actualHandler.handle(new TaskAttemptContainerAssignedEvent(yarnAttemptID, actualHandler.handle(new TaskAttemptContainerAssignedEvent(yarnAttemptID,
container, null)); container, null));
} }

View File

@ -315,15 +315,17 @@ public class MRApp extends MRAppMaster {
} }
class MockContainerLauncher implements ContainerLauncher { class MockContainerLauncher implements ContainerLauncher {
//We are running locally so set the shuffle port to -1
int shufflePort = -1;
@Override @Override
public void handle(ContainerLauncherEvent event) { public void handle(ContainerLauncherEvent event) {
switch (event.getType()) { switch (event.getType()) {
case CONTAINER_REMOTE_LAUNCH: case CONTAINER_REMOTE_LAUNCH:
//We are running locally so set the shuffle port to -1
getContext().getEventHandler().handle( getContext().getEventHandler().handle(
new TaskAttemptContainerLaunchedEvent(event.getTaskAttemptID(), new TaskAttemptContainerLaunchedEvent(event.getTaskAttemptID(),
-1) shufflePort));
);
attemptLaunched(event.getTaskAttemptID()); attemptLaunched(event.getTaskAttemptID());
break; break;
@ -355,13 +357,9 @@ public class MRApp extends MRAppMaster {
ContainerId cId = recordFactory.newRecordInstance(ContainerId.class); ContainerId cId = recordFactory.newRecordInstance(ContainerId.class);
cId.setApplicationAttemptId(getContext().getApplicationAttemptId()); cId.setApplicationAttemptId(getContext().getApplicationAttemptId());
cId.setId(containerCount++); cId.setId(containerCount++);
Container container = recordFactory.newRecordInstance(Container.class); NodeId nodeId = BuilderUtils.newNodeId("localhost", 1234);
container.setId(cId); Container container = BuilderUtils.newContainer(cId, nodeId,
container.setNodeId(recordFactory.newRecordInstance(NodeId.class)); "localhost:9999", null, null, null);
container.getNodeId().setHost("dummy");
container.getNodeId().setPort(1234);
container.setContainerToken(null);
container.setNodeHttpAddress("localhost:9999");
getContext().getEventHandler().handle( getContext().getEventHandler().handle(
new TaskAttemptContainerAssignedEvent(event.getAttemptID(), new TaskAttemptContainerAssignedEvent(event.getAttemptID(),
container, null)); container, null));

View File

@ -51,6 +51,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt; import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.EventHandler;
import org.junit.Test; import org.junit.Test;
@ -269,6 +270,9 @@ public class TestRecovery {
//wait for map task to complete //wait for map task to complete
app.waitForState(mapTask1, TaskState.SUCCEEDED); app.waitForState(mapTask1, TaskState.SUCCEEDED);
// Verify the shuffle-port
Assert.assertEquals(5467, task1Attempt1.getShufflePort());
app.waitForState(reduceTask1, TaskState.RUNNING); app.waitForState(reduceTask1, TaskState.RUNNING);
TaskAttempt reduce1Attempt1 = reduceTask1.getAttempts().values().iterator().next(); TaskAttempt reduce1Attempt1 = reduceTask1.getAttempts().values().iterator().next();
@ -290,7 +294,8 @@ public class TestRecovery {
//rerun //rerun
//in rerun the map will be recovered from previous run //in rerun the map will be recovered from previous run
app = new MRAppWithHistory(1, 2, false, this.getClass().getName(), false, ++runCount); app = new MRAppWithHistory(1, 2, false, this.getClass().getName(), false,
++runCount);
conf = new Configuration(); conf = new Configuration();
conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true); conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
conf.setBoolean("mapred.mapper.new-api", true); conf.setBoolean("mapred.mapper.new-api", true);
@ -308,6 +313,10 @@ public class TestRecovery {
// map will be recovered, no need to send done // map will be recovered, no need to send done
app.waitForState(mapTask1, TaskState.SUCCEEDED); app.waitForState(mapTask1, TaskState.SUCCEEDED);
// Verify the shuffle-port after recovery
task1Attempt1 = mapTask1.getAttempts().values().iterator().next();
Assert.assertEquals(5467, task1Attempt1.getShufflePort());
// first reduce will be recovered, no need to send done // first reduce will be recovered, no need to send done
app.waitForState(reduceTask1, TaskState.SUCCEEDED); app.waitForState(reduceTask1, TaskState.SUCCEEDED);
@ -397,6 +406,13 @@ public class TestRecovery {
super(maps, reduces, autoComplete, testName, cleanOnStart, startCount); super(maps, reduces, autoComplete, testName, cleanOnStart, startCount);
} }
@Override
protected ContainerLauncher createContainerLauncher(AppContext context) {
MockContainerLauncher launcher = new MockContainerLauncher();
launcher.shufflePort = 5467;
return launcher;
}
@Override @Override
protected EventHandler<JobHistoryEvent> createJobHistoryHandler( protected EventHandler<JobHistoryEvent> createJobHistoryHandler(
AppContext context) { AppContext context) {