MAPREDUCE-5152. Make MR App to simply pass through the container from RM instead of extracting and populating information itself to start any container. Contributed by Vinod Kumar Vavilapalli.
svn merge --ignore-ancestry -c 1469544 ../../trunk/ git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1469545 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
df24059512
commit
0df869e1f7
|
@ -43,6 +43,10 @@ Release 2.0.5-beta - UNRELEASED
|
||||||
MAPREDUCE-4985. Add compression option to TestDFSIO usage.
|
MAPREDUCE-4985. Add compression option to TestDFSIO usage.
|
||||||
(Plamen Jeliazkov via shv)
|
(Plamen Jeliazkov via shv)
|
||||||
|
|
||||||
|
MAPREDUCE-5152. Make MR App to simply pass through the container from RM
|
||||||
|
instead of extracting and populating information itself to start any
|
||||||
|
container. (vinodkv)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
MAPREDUCE-4974. Optimising the LineRecordReader initialize() method
|
MAPREDUCE-4974. Optimising the LineRecordReader initialize() method
|
||||||
|
|
|
@ -117,7 +117,6 @@ import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
||||||
import org.apache.hadoop.yarn.api.records.Container;
|
import org.apache.hadoop.yarn.api.records.Container;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerToken;
|
|
||||||
import org.apache.hadoop.yarn.api.records.LocalResource;
|
import org.apache.hadoop.yarn.api.records.LocalResource;
|
||||||
import org.apache.hadoop.yarn.api.records.LocalResourceType;
|
import org.apache.hadoop.yarn.api.records.LocalResourceType;
|
||||||
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
|
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
|
||||||
|
@ -490,14 +489,10 @@ public abstract class TaskAttemptImpl implements
|
||||||
<TaskAttemptStateInternal, TaskAttemptEventType, TaskAttemptEvent>
|
<TaskAttemptStateInternal, TaskAttemptEventType, TaskAttemptEvent>
|
||||||
stateMachine;
|
stateMachine;
|
||||||
|
|
||||||
private ContainerId containerID;
|
@VisibleForTesting
|
||||||
private NodeId containerNodeId;
|
public Container container;
|
||||||
private String containerMgrAddress;
|
|
||||||
private String nodeHttpAddress;
|
|
||||||
private String nodeRackName;
|
private String nodeRackName;
|
||||||
private WrappedJvmID jvmID;
|
private WrappedJvmID jvmID;
|
||||||
private ContainerToken containerToken;
|
|
||||||
private Resource assignedCapability;
|
|
||||||
|
|
||||||
//this takes good amount of memory ~ 30KB. Instantiate it lazily
|
//this takes good amount of memory ~ 30KB. Instantiate it lazily
|
||||||
//and make it null once task is launched.
|
//and make it null once task is launched.
|
||||||
|
@ -825,7 +820,7 @@ public abstract class TaskAttemptImpl implements
|
||||||
public ContainerId getAssignedContainerID() {
|
public ContainerId getAssignedContainerID() {
|
||||||
readLock.lock();
|
readLock.lock();
|
||||||
try {
|
try {
|
||||||
return containerID;
|
return container == null ? null : container.getId();
|
||||||
} finally {
|
} finally {
|
||||||
readLock.unlock();
|
readLock.unlock();
|
||||||
}
|
}
|
||||||
|
@ -835,7 +830,8 @@ public abstract class TaskAttemptImpl implements
|
||||||
public String getAssignedContainerMgrAddress() {
|
public String getAssignedContainerMgrAddress() {
|
||||||
readLock.lock();
|
readLock.lock();
|
||||||
try {
|
try {
|
||||||
return containerMgrAddress;
|
return container == null ? null : StringInterner.weakIntern(container
|
||||||
|
.getNodeId().toString());
|
||||||
} finally {
|
} finally {
|
||||||
readLock.unlock();
|
readLock.unlock();
|
||||||
}
|
}
|
||||||
|
@ -895,7 +891,7 @@ public abstract class TaskAttemptImpl implements
|
||||||
public NodeId getNodeId() {
|
public NodeId getNodeId() {
|
||||||
readLock.lock();
|
readLock.lock();
|
||||||
try {
|
try {
|
||||||
return containerNodeId;
|
return container == null ? null : container.getNodeId();
|
||||||
} finally {
|
} finally {
|
||||||
readLock.unlock();
|
readLock.unlock();
|
||||||
}
|
}
|
||||||
|
@ -907,7 +903,7 @@ public abstract class TaskAttemptImpl implements
|
||||||
public String getNodeHttpAddress() {
|
public String getNodeHttpAddress() {
|
||||||
readLock.lock();
|
readLock.lock();
|
||||||
try {
|
try {
|
||||||
return nodeHttpAddress;
|
return container == null ? null : container.getNodeHttpAddress();
|
||||||
} finally {
|
} finally {
|
||||||
readLock.unlock();
|
readLock.unlock();
|
||||||
}
|
}
|
||||||
|
@ -967,8 +963,8 @@ public abstract class TaskAttemptImpl implements
|
||||||
result.setContainerId(this.getAssignedContainerID());
|
result.setContainerId(this.getAssignedContainerID());
|
||||||
result.setNodeManagerHost(trackerName);
|
result.setNodeManagerHost(trackerName);
|
||||||
result.setNodeManagerHttpPort(httpPort);
|
result.setNodeManagerHttpPort(httpPort);
|
||||||
if (this.containerNodeId != null) {
|
if (this.container != null) {
|
||||||
result.setNodeManagerPort(this.containerNodeId.getPort());
|
result.setNodeManagerPort(this.container.getNodeId().getPort());
|
||||||
}
|
}
|
||||||
return result;
|
return result;
|
||||||
} finally {
|
} finally {
|
||||||
|
@ -1093,13 +1089,17 @@ public abstract class TaskAttemptImpl implements
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
public TaskAttemptStateInternal recover(TaskAttemptInfo taInfo,
|
public TaskAttemptStateInternal recover(TaskAttemptInfo taInfo,
|
||||||
OutputCommitter committer, boolean recoverOutput) {
|
OutputCommitter committer, boolean recoverOutput) {
|
||||||
containerID = taInfo.getContainerId();
|
ContainerId containerId = taInfo.getContainerId();
|
||||||
containerNodeId = ConverterUtils.toNodeId(taInfo.getHostname() + ":"
|
NodeId containerNodeId = ConverterUtils.toNodeId(taInfo.getHostname() + ":"
|
||||||
+ taInfo.getPort());
|
+ taInfo.getPort());
|
||||||
containerMgrAddress = StringInterner.weakIntern(
|
String nodeHttpAddress = StringInterner.weakIntern(taInfo.getHostname() + ":"
|
||||||
containerNodeId.toString());
|
|
||||||
nodeHttpAddress = StringInterner.weakIntern(taInfo.getHostname() + ":"
|
|
||||||
+ taInfo.getHttpPort());
|
+ taInfo.getHttpPort());
|
||||||
|
// Resource/Priority/Tokens are only needed while launching the
|
||||||
|
// container on an NM, these are already completed tasks, so setting them to
|
||||||
|
// null
|
||||||
|
container =
|
||||||
|
BuilderUtils.newContainer(containerId, containerNodeId,
|
||||||
|
nodeHttpAddress, null, null, null);
|
||||||
computeRackAndLocality();
|
computeRackAndLocality();
|
||||||
launchTime = taInfo.getStartTime();
|
launchTime = taInfo.getStartTime();
|
||||||
finishTime = (taInfo.getFinishTime() != -1) ?
|
finishTime = (taInfo.getFinishTime() != -1) ?
|
||||||
|
@ -1227,6 +1227,7 @@ public abstract class TaskAttemptImpl implements
|
||||||
}
|
}
|
||||||
|
|
||||||
private void computeRackAndLocality() {
|
private void computeRackAndLocality() {
|
||||||
|
NodeId containerNodeId = container.getNodeId();
|
||||||
nodeRackName = RackResolver.resolve(
|
nodeRackName = RackResolver.resolve(
|
||||||
containerNodeId.getHost()).getNetworkLocation();
|
containerNodeId.getHost()).getNetworkLocation();
|
||||||
|
|
||||||
|
@ -1331,10 +1332,10 @@ public abstract class TaskAttemptImpl implements
|
||||||
TypeConverter.fromYarn(taskAttempt.attemptId.getTaskId()
|
TypeConverter.fromYarn(taskAttempt.attemptId.getTaskId()
|
||||||
.getTaskType()), attemptState.toString(),
|
.getTaskType()), attemptState.toString(),
|
||||||
taskAttempt.finishTime,
|
taskAttempt.finishTime,
|
||||||
taskAttempt.containerNodeId == null ? "UNKNOWN"
|
taskAttempt.container == null ? "UNKNOWN"
|
||||||
: taskAttempt.containerNodeId.getHost(),
|
: taskAttempt.container.getNodeId().getHost(),
|
||||||
taskAttempt.containerNodeId == null ? -1
|
taskAttempt.container == null ? -1
|
||||||
: taskAttempt.containerNodeId.getPort(),
|
: taskAttempt.container.getNodeId().getPort(),
|
||||||
taskAttempt.nodeRackName == null ? "UNKNOWN"
|
taskAttempt.nodeRackName == null ? "UNKNOWN"
|
||||||
: taskAttempt.nodeRackName,
|
: taskAttempt.nodeRackName,
|
||||||
StringUtils.join(
|
StringUtils.join(
|
||||||
|
@ -1353,12 +1354,12 @@ public abstract class TaskAttemptImpl implements
|
||||||
eventHandler.handle(jce);
|
eventHandler.handle(jce);
|
||||||
|
|
||||||
LOG.info("TaskAttempt: [" + attemptId
|
LOG.info("TaskAttempt: [" + attemptId
|
||||||
+ "] using containerId: [" + containerID + " on NM: ["
|
+ "] using containerId: [" + container.getId() + " on NM: ["
|
||||||
+ containerMgrAddress + "]");
|
+ StringInterner.weakIntern(container.getNodeId().toString()) + "]");
|
||||||
TaskAttemptStartedEvent tase =
|
TaskAttemptStartedEvent tase =
|
||||||
new TaskAttemptStartedEvent(TypeConverter.fromYarn(attemptId),
|
new TaskAttemptStartedEvent(TypeConverter.fromYarn(attemptId),
|
||||||
TypeConverter.fromYarn(attemptId.getTaskId().getTaskType()),
|
TypeConverter.fromYarn(attemptId.getTaskId().getTaskType()),
|
||||||
launchTime, trackerName, httpPort, shufflePort, containerID,
|
launchTime, trackerName, httpPort, shufflePort, container.getId(),
|
||||||
locality.toString(), avataar.toString());
|
locality.toString(), avataar.toString());
|
||||||
eventHandler.handle(
|
eventHandler.handle(
|
||||||
new JobHistoryEvent(attemptId.getTaskId().getJobId(), tase));
|
new JobHistoryEvent(attemptId.getTaskId().getJobId(), tase));
|
||||||
|
@ -1490,19 +1491,14 @@ public abstract class TaskAttemptImpl implements
|
||||||
TaskAttemptEvent event) {
|
TaskAttemptEvent event) {
|
||||||
final TaskAttemptContainerAssignedEvent cEvent =
|
final TaskAttemptContainerAssignedEvent cEvent =
|
||||||
(TaskAttemptContainerAssignedEvent) event;
|
(TaskAttemptContainerAssignedEvent) event;
|
||||||
taskAttempt.containerID = cEvent.getContainer().getId();
|
Container container = cEvent.getContainer();
|
||||||
taskAttempt.containerNodeId = cEvent.getContainer().getNodeId();
|
taskAttempt.container = container;
|
||||||
taskAttempt.containerMgrAddress = StringInterner.weakIntern(
|
|
||||||
taskAttempt.containerNodeId.toString());
|
|
||||||
taskAttempt.nodeHttpAddress = StringInterner.weakIntern(
|
|
||||||
cEvent.getContainer().getNodeHttpAddress());
|
|
||||||
taskAttempt.containerToken = cEvent.getContainer().getContainerToken();
|
|
||||||
taskAttempt.assignedCapability = cEvent.getContainer().getResource();
|
|
||||||
// this is a _real_ Task (classic Hadoop mapred flavor):
|
// this is a _real_ Task (classic Hadoop mapred flavor):
|
||||||
taskAttempt.remoteTask = taskAttempt.createRemoteTask();
|
taskAttempt.remoteTask = taskAttempt.createRemoteTask();
|
||||||
taskAttempt.jvmID = new WrappedJvmID(
|
taskAttempt.jvmID =
|
||||||
taskAttempt.remoteTask.getTaskID().getJobID(),
|
new WrappedJvmID(taskAttempt.remoteTask.getTaskID().getJobID(),
|
||||||
taskAttempt.remoteTask.isMapTask(), taskAttempt.containerID.getId());
|
taskAttempt.remoteTask.isMapTask(), taskAttempt.container.getId()
|
||||||
|
.getId());
|
||||||
taskAttempt.taskAttemptListener.registerPendingTask(
|
taskAttempt.taskAttemptListener.registerPendingTask(
|
||||||
taskAttempt.remoteTask, taskAttempt.jvmID);
|
taskAttempt.remoteTask, taskAttempt.jvmID);
|
||||||
|
|
||||||
|
@ -1514,10 +1510,9 @@ public abstract class TaskAttemptImpl implements
|
||||||
cEvent.getApplicationACLs(), taskAttempt.conf, taskAttempt.jobToken,
|
cEvent.getApplicationACLs(), taskAttempt.conf, taskAttempt.jobToken,
|
||||||
taskAttempt.remoteTask, taskAttempt.oldJobId, taskAttempt.jvmID,
|
taskAttempt.remoteTask, taskAttempt.oldJobId, taskAttempt.jvmID,
|
||||||
taskAttempt.taskAttemptListener, taskAttempt.credentials);
|
taskAttempt.taskAttemptListener, taskAttempt.credentials);
|
||||||
taskAttempt.eventHandler.handle(new ContainerRemoteLaunchEvent(
|
taskAttempt.eventHandler
|
||||||
taskAttempt.attemptId, taskAttempt.containerID,
|
.handle(new ContainerRemoteLaunchEvent(taskAttempt.attemptId,
|
||||||
taskAttempt.containerMgrAddress, taskAttempt.containerToken,
|
launchContext, container, taskAttempt.remoteTask));
|
||||||
launchContext, taskAttempt.assignedCapability, taskAttempt.remoteTask));
|
|
||||||
|
|
||||||
// send event to speculator that our container needs are satisfied
|
// send event to speculator that our container needs are satisfied
|
||||||
taskAttempt.eventHandler.handle
|
taskAttempt.eventHandler.handle
|
||||||
|
@ -1604,9 +1599,8 @@ public abstract class TaskAttemptImpl implements
|
||||||
taskAttempt.taskAttemptListener
|
taskAttempt.taskAttemptListener
|
||||||
.registerLaunchedTask(taskAttempt.attemptId, taskAttempt.jvmID);
|
.registerLaunchedTask(taskAttempt.attemptId, taskAttempt.jvmID);
|
||||||
//TODO Resolve to host / IP in case of a local address.
|
//TODO Resolve to host / IP in case of a local address.
|
||||||
InetSocketAddress nodeHttpInetAddr =
|
InetSocketAddress nodeHttpInetAddr = // TODO: Costly to create sock-addr?
|
||||||
NetUtils.createSocketAddr(taskAttempt.nodeHttpAddress); // TODO:
|
NetUtils.createSocketAddr(taskAttempt.container.getNodeHttpAddress());
|
||||||
// Costly?
|
|
||||||
taskAttempt.trackerName = nodeHttpInetAddr.getHostName();
|
taskAttempt.trackerName = nodeHttpInetAddr.getHostName();
|
||||||
taskAttempt.httpPort = nodeHttpInetAddr.getPort();
|
taskAttempt.httpPort = nodeHttpInetAddr.getPort();
|
||||||
taskAttempt.sendLaunchedEvents();
|
taskAttempt.sendLaunchedEvents();
|
||||||
|
@ -1713,6 +1707,10 @@ public abstract class TaskAttemptImpl implements
|
||||||
private void logAttemptFinishedEvent(TaskAttemptStateInternal state) {
|
private void logAttemptFinishedEvent(TaskAttemptStateInternal state) {
|
||||||
//Log finished events only if an attempt started.
|
//Log finished events only if an attempt started.
|
||||||
if (getLaunchTime() == 0) return;
|
if (getLaunchTime() == 0) return;
|
||||||
|
String containerHostName = this.container == null ? "UNKNOWN"
|
||||||
|
: this.container.getNodeId().getHost();
|
||||||
|
int containerNodePort =
|
||||||
|
this.container == null ? -1 : this.container.getNodeId().getPort();
|
||||||
if (attemptId.getTaskId().getTaskType() == TaskType.MAP) {
|
if (attemptId.getTaskId().getTaskType() == TaskType.MAP) {
|
||||||
MapAttemptFinishedEvent mfe =
|
MapAttemptFinishedEvent mfe =
|
||||||
new MapAttemptFinishedEvent(TypeConverter.fromYarn(attemptId),
|
new MapAttemptFinishedEvent(TypeConverter.fromYarn(attemptId),
|
||||||
|
@ -1720,9 +1718,8 @@ public abstract class TaskAttemptImpl implements
|
||||||
state.toString(),
|
state.toString(),
|
||||||
this.reportedStatus.mapFinishTime,
|
this.reportedStatus.mapFinishTime,
|
||||||
finishTime,
|
finishTime,
|
||||||
this.containerNodeId == null ? "UNKNOWN"
|
containerHostName,
|
||||||
: this.containerNodeId.getHost(),
|
containerNodePort,
|
||||||
this.containerNodeId == null ? -1 : this.containerNodeId.getPort(),
|
|
||||||
this.nodeRackName == null ? "UNKNOWN" : this.nodeRackName,
|
this.nodeRackName == null ? "UNKNOWN" : this.nodeRackName,
|
||||||
this.reportedStatus.stateString,
|
this.reportedStatus.stateString,
|
||||||
getCounters(),
|
getCounters(),
|
||||||
|
@ -1737,9 +1734,8 @@ public abstract class TaskAttemptImpl implements
|
||||||
this.reportedStatus.shuffleFinishTime,
|
this.reportedStatus.shuffleFinishTime,
|
||||||
this.reportedStatus.sortFinishTime,
|
this.reportedStatus.sortFinishTime,
|
||||||
finishTime,
|
finishTime,
|
||||||
this.containerNodeId == null ? "UNKNOWN"
|
containerHostName,
|
||||||
: this.containerNodeId.getHost(),
|
containerNodePort,
|
||||||
this.containerNodeId == null ? -1 : this.containerNodeId.getPort(),
|
|
||||||
this.nodeRackName == null ? "UNKNOWN" : this.nodeRackName,
|
this.nodeRackName == null ? "UNKNOWN" : this.nodeRackName,
|
||||||
this.reportedStatus.stateString,
|
this.reportedStatus.stateString,
|
||||||
getCounters(),
|
getCounters(),
|
||||||
|
@ -1864,8 +1860,9 @@ public abstract class TaskAttemptImpl implements
|
||||||
//send the cleanup event to containerLauncher
|
//send the cleanup event to containerLauncher
|
||||||
taskAttempt.eventHandler.handle(new ContainerLauncherEvent(
|
taskAttempt.eventHandler.handle(new ContainerLauncherEvent(
|
||||||
taskAttempt.attemptId,
|
taskAttempt.attemptId,
|
||||||
taskAttempt.containerID, taskAttempt.containerMgrAddress,
|
taskAttempt.container.getId(), StringInterner
|
||||||
taskAttempt.containerToken,
|
.weakIntern(taskAttempt.container.getNodeId().toString()),
|
||||||
|
taskAttempt.container.getContainerToken(),
|
||||||
ContainerLauncher.EventType.CONTAINER_REMOTE_CLEANUP));
|
ContainerLauncher.EventType.CONTAINER_REMOTE_CLEANUP));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -59,7 +59,6 @@ import org.apache.hadoop.yarn.api.records.ContainerToken;
|
||||||
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
||||||
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
||||||
import org.apache.hadoop.yarn.service.AbstractService;
|
import org.apache.hadoop.yarn.service.AbstractService;
|
||||||
import org.apache.hadoop.yarn.util.BuilderUtils;
|
|
||||||
import org.apache.hadoop.yarn.util.ProtoUtils;
|
import org.apache.hadoop.yarn.util.ProtoUtils;
|
||||||
import org.apache.hadoop.yarn.util.Records;
|
import org.apache.hadoop.yarn.util.Records;
|
||||||
|
|
||||||
|
@ -149,16 +148,13 @@ public class ContainerLauncherImpl extends AbstractService implements
|
||||||
|
|
||||||
// Construct the actual Container
|
// Construct the actual Container
|
||||||
ContainerLaunchContext containerLaunchContext =
|
ContainerLaunchContext containerLaunchContext =
|
||||||
event.getContainer();
|
event.getContainerLaunchContext();
|
||||||
|
|
||||||
org.apache.hadoop.yarn.api.records.Container container =
|
|
||||||
BuilderUtils.newContainer(containerID, null, null,
|
|
||||||
event.getResource(), null, containerToken);
|
|
||||||
// Now launch the actual container
|
// Now launch the actual container
|
||||||
StartContainerRequest startRequest = Records
|
StartContainerRequest startRequest = Records
|
||||||
.newRecord(StartContainerRequest.class);
|
.newRecord(StartContainerRequest.class);
|
||||||
startRequest.setContainerLaunchContext(containerLaunchContext);
|
startRequest.setContainerLaunchContext(containerLaunchContext);
|
||||||
startRequest.setContainer(container);
|
startRequest.setContainer(event.getAllocatedContainer());
|
||||||
StartContainerResponse response = proxy.startContainer(startRequest);
|
StartContainerResponse response = proxy.startContainer(startRequest);
|
||||||
|
|
||||||
ByteBuffer portInfo = response
|
ByteBuffer portInfo = response
|
||||||
|
|
|
@ -20,35 +20,34 @@ package org.apache.hadoop.mapreduce.v2.app.launcher;
|
||||||
|
|
||||||
import org.apache.hadoop.mapred.Task;
|
import org.apache.hadoop.mapred.Task;
|
||||||
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
|
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
import org.apache.hadoop.util.StringInterner;
|
||||||
|
import org.apache.hadoop.yarn.api.records.Container;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerToken;
|
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
|
||||||
|
|
||||||
public class ContainerRemoteLaunchEvent extends ContainerLauncherEvent {
|
public class ContainerRemoteLaunchEvent extends ContainerLauncherEvent {
|
||||||
|
|
||||||
private final ContainerLaunchContext container;
|
private final Container allocatedContainer;
|
||||||
|
private final ContainerLaunchContext containerLaunchContext;
|
||||||
private final Task task;
|
private final Task task;
|
||||||
private final Resource resource;
|
|
||||||
|
|
||||||
public ContainerRemoteLaunchEvent(TaskAttemptId taskAttemptID,
|
public ContainerRemoteLaunchEvent(TaskAttemptId taskAttemptID,
|
||||||
ContainerId containerID, String containerMgrAddress,
|
ContainerLaunchContext containerLaunchContext,
|
||||||
ContainerToken containerToken,
|
Container allocatedContainer, Task remoteTask) {
|
||||||
ContainerLaunchContext containerLaunchContext, Resource resource,
|
super(taskAttemptID, allocatedContainer.getId(), StringInterner
|
||||||
Task remoteTask) {
|
.weakIntern(allocatedContainer.getNodeId().toString()),
|
||||||
super(taskAttemptID, containerID, containerMgrAddress, containerToken,
|
allocatedContainer.getContainerToken(),
|
||||||
ContainerLauncher.EventType.CONTAINER_REMOTE_LAUNCH);
|
ContainerLauncher.EventType.CONTAINER_REMOTE_LAUNCH);
|
||||||
this.container = containerLaunchContext;
|
this.allocatedContainer = allocatedContainer;
|
||||||
|
this.containerLaunchContext = containerLaunchContext;
|
||||||
this.task = remoteTask;
|
this.task = remoteTask;
|
||||||
this.resource = resource;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public ContainerLaunchContext getContainer() {
|
public ContainerLaunchContext getContainerLaunchContext() {
|
||||||
return this.container;
|
return this.containerLaunchContext;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Resource getResource() {
|
public Container getAllocatedContainer() {
|
||||||
return this.resource;
|
return this.allocatedContainer;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Task getRemoteTask() {
|
public Task getRemoteTask() {
|
||||||
|
|
|
@ -23,6 +23,7 @@ import static org.mockito.Mockito.times;
|
||||||
import static org.mockito.Mockito.verify;
|
import static org.mockito.Mockito.verify;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collection;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
|
|
||||||
import junit.framework.Assert;
|
import junit.framework.Assert;
|
||||||
|
@ -46,6 +47,11 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
|
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
|
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
|
import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
|
||||||
|
import org.apache.hadoop.mapreduce.v2.app.job.impl.TaskAttemptImpl;
|
||||||
|
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
|
||||||
|
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent;
|
||||||
|
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerRemoteLaunchEvent;
|
||||||
|
import org.apache.hadoop.yarn.api.records.Container;
|
||||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
import org.apache.hadoop.yarn.api.records.NodeReport;
|
import org.apache.hadoop.yarn.api.records.NodeReport;
|
||||||
import org.apache.hadoop.yarn.api.records.NodeState;
|
import org.apache.hadoop.yarn.api.records.NodeState;
|
||||||
|
@ -412,6 +418,39 @@ public class TestMRApp {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private Container containerObtainedByContainerLauncher;
|
||||||
|
@Test
|
||||||
|
public void testContainerPassThrough() throws Exception {
|
||||||
|
MRApp app = new MRApp(0, 1, true, this.getClass().getName(), true) {
|
||||||
|
@Override
|
||||||
|
protected ContainerLauncher createContainerLauncher(AppContext context) {
|
||||||
|
return new MockContainerLauncher() {
|
||||||
|
@Override
|
||||||
|
public void handle(ContainerLauncherEvent event) {
|
||||||
|
if (event instanceof ContainerRemoteLaunchEvent) {
|
||||||
|
containerObtainedByContainerLauncher =
|
||||||
|
((ContainerRemoteLaunchEvent) event).getAllocatedContainer();
|
||||||
|
}
|
||||||
|
super.handle(event);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
};
|
||||||
|
};
|
||||||
|
Job job = app.submit(new Configuration());
|
||||||
|
app.waitForState(job, JobState.SUCCEEDED);
|
||||||
|
app.verifyCompleted();
|
||||||
|
|
||||||
|
Collection<Task> tasks = job.getTasks().values();
|
||||||
|
Collection<TaskAttempt> taskAttempts =
|
||||||
|
tasks.iterator().next().getAttempts().values();
|
||||||
|
TaskAttemptImpl taskAttempt =
|
||||||
|
(TaskAttemptImpl) taskAttempts.iterator().next();
|
||||||
|
// Container from RM should pass through to the launcher. Container object
|
||||||
|
// should be the same.
|
||||||
|
Assert.assertTrue(taskAttempt.container
|
||||||
|
== containerObtainedByContainerLauncher);
|
||||||
|
}
|
||||||
|
|
||||||
private final class MRAppWithHistory extends MRApp {
|
private final class MRAppWithHistory extends MRApp {
|
||||||
public MRAppWithHistory(int maps, int reduces, boolean autoComplete,
|
public MRAppWithHistory(int maps, int reduces, boolean autoComplete,
|
||||||
String testName, boolean cleanOnStart, int startCount) {
|
String testName, boolean cleanOnStart, int startCount) {
|
||||||
|
|
|
@ -78,7 +78,8 @@ public class TestMapReduceChildJVM {
|
||||||
public void handle(ContainerLauncherEvent event) {
|
public void handle(ContainerLauncherEvent event) {
|
||||||
if (event.getType() == EventType.CONTAINER_REMOTE_LAUNCH) {
|
if (event.getType() == EventType.CONTAINER_REMOTE_LAUNCH) {
|
||||||
ContainerRemoteLaunchEvent launchEvent = (ContainerRemoteLaunchEvent) event;
|
ContainerRemoteLaunchEvent launchEvent = (ContainerRemoteLaunchEvent) event;
|
||||||
ContainerLaunchContext launchContext = launchEvent.getContainer();
|
ContainerLaunchContext launchContext =
|
||||||
|
launchEvent.getContainerLaunchContext();
|
||||||
String cmdString = launchContext.getCommands().toString();
|
String cmdString = launchContext.getCommands().toString();
|
||||||
LOG.info("launchContext " + cmdString);
|
LOG.info("launchContext " + cmdString);
|
||||||
myCommandLine = cmdString;
|
myCommandLine = cmdString;
|
||||||
|
|
|
@ -37,7 +37,6 @@ import org.apache.hadoop.mapreduce.MRJobConfig;
|
||||||
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
|
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
|
||||||
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
|
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
|
||||||
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
|
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
|
||||||
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
|
|
||||||
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
|
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
|
||||||
import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
|
import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
|
||||||
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
|
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
|
||||||
|
@ -224,10 +223,6 @@ public class TestContainerLauncher {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testSlowNM() throws Exception {
|
public void testSlowNM() throws Exception {
|
||||||
test();
|
|
||||||
}
|
|
||||||
|
|
||||||
private void test() throws Exception {
|
|
||||||
|
|
||||||
conf = new Configuration();
|
conf = new Configuration();
|
||||||
int maxAttempts = 1;
|
int maxAttempts = 1;
|
||||||
|
@ -382,6 +377,15 @@ public class TestContainerLauncher {
|
||||||
@Override
|
@Override
|
||||||
public StartContainerResponse startContainer(StartContainerRequest request)
|
public StartContainerResponse startContainer(StartContainerRequest request)
|
||||||
throws YarnRemoteException {
|
throws YarnRemoteException {
|
||||||
|
|
||||||
|
// Validate that the container is what RM is giving.
|
||||||
|
Assert.assertEquals(MRApp.NM_HOST, request.getContainer().getNodeId()
|
||||||
|
.getHost());
|
||||||
|
Assert.assertEquals(MRApp.NM_PORT, request.getContainer().getNodeId()
|
||||||
|
.getPort());
|
||||||
|
Assert.assertEquals(MRApp.NM_HOST + ":" + MRApp.NM_HTTP_PORT, request
|
||||||
|
.getContainer().getNodeHttpAddress());
|
||||||
|
|
||||||
StartContainerResponse response = recordFactory
|
StartContainerResponse response = recordFactory
|
||||||
.newRecordInstance(StartContainerResponse.class);
|
.newRecordInstance(StartContainerResponse.class);
|
||||||
status = recordFactory.newRecordInstance(ContainerStatus.class);
|
status = recordFactory.newRecordInstance(ContainerStatus.class);
|
||||||
|
|
Loading…
Reference in New Issue