MAPREDUCE-6785. ContainerLauncherImpl support for reusing the containers.

Contributed by Naganarasimha G R.
This commit is contained in:
Devaraj K 2017-04-04 15:48:35 -07:00 committed by bilwa
parent e274d508ff
commit f49cec888a
7 changed files with 169 additions and 77 deletions

View File

@ -28,6 +28,7 @@ public class TaskAttemptContainerAssignedEvent extends TaskAttemptEvent {
private final Container container; private final Container container;
private final Map<ApplicationAccessType, String> applicationACLs; private final Map<ApplicationAccessType, String> applicationACLs;
private int shufflePort = -1;
public TaskAttemptContainerAssignedEvent(TaskAttemptId id, public TaskAttemptContainerAssignedEvent(TaskAttemptId id,
Container container, Map<ApplicationAccessType, String> applicationACLs) { Container container, Map<ApplicationAccessType, String> applicationACLs) {
@ -36,6 +37,14 @@ public class TaskAttemptContainerAssignedEvent extends TaskAttemptEvent {
this.applicationACLs = applicationACLs; this.applicationACLs = applicationACLs;
} }
public int getShufflePort() {
return shufflePort;
}
public void setShufflePort(int shufflePort) {
this.shufflePort = shufflePort;
}
public Container getContainer() { public Container getContainer() {
return this.container; return this.container;
} }

View File

@ -265,7 +265,8 @@ public abstract class TaskAttemptImpl implements
// Transitions from the UNASSIGNED state. // Transitions from the UNASSIGNED state.
.addTransition(TaskAttemptStateInternal.UNASSIGNED, .addTransition(TaskAttemptStateInternal.UNASSIGNED,
TaskAttemptStateInternal.ASSIGNED, TaskAttemptEventType.TA_ASSIGNED, EnumSet.of(TaskAttemptStateInternal.ASSIGNED,
TaskAttemptStateInternal.RUNNING), TaskAttemptEventType.TA_ASSIGNED,
new ContainerAssignedTransition()) new ContainerAssignedTransition())
.addTransition(TaskAttemptStateInternal.UNASSIGNED, TaskAttemptStateInternal.KILLED, .addTransition(TaskAttemptStateInternal.UNASSIGNED, TaskAttemptStateInternal.KILLED,
TaskAttemptEventType.TA_KILL, new DeallocateContainerTransition( TaskAttemptEventType.TA_KILL, new DeallocateContainerTransition(
@ -1876,13 +1877,14 @@ public abstract class TaskAttemptImpl implements
} }
private static class ContainerAssignedTransition implements private static class ContainerAssignedTransition implements
SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> { MultipleArcTransition<TaskAttemptImpl, TaskAttemptEvent,
TaskAttemptStateInternal> {
@SuppressWarnings({ "unchecked" }) @SuppressWarnings({ "unchecked" })
@Override @Override
public void transition(final TaskAttemptImpl taskAttempt, public TaskAttemptStateInternal transition(
TaskAttemptEvent event) { final TaskAttemptImpl taskAttempt, TaskAttemptEvent event) {
final TaskAttemptContainerAssignedEvent cEvent = final TaskAttemptContainerAssignedEvent cEvent =
(TaskAttemptContainerAssignedEvent) event; (TaskAttemptContainerAssignedEvent) event;
Container container = cEvent.getContainer(); Container container = cEvent.getContainer();
taskAttempt.container = container; taskAttempt.container = container;
// this is a _real_ Task (classic Hadoop mapred flavor): // this is a _real_ Task (classic Hadoop mapred flavor):
@ -1895,20 +1897,26 @@ public abstract class TaskAttemptImpl implements
taskAttempt.remoteTask, taskAttempt.jvmID); taskAttempt.remoteTask, taskAttempt.jvmID);
taskAttempt.computeRackAndLocality(); taskAttempt.computeRackAndLocality();
//launch the container
//create the container object to be launched for a given Task attempt
ContainerLaunchContext launchContext = createContainerLaunchContext(
cEvent.getApplicationACLs(), taskAttempt.conf, taskAttempt.jobToken,
taskAttempt.remoteTask, taskAttempt.oldJobId, taskAttempt.jvmID,
taskAttempt.taskAttemptListener, taskAttempt.credentials);
taskAttempt.eventHandler
.handle(new ContainerRemoteLaunchEvent(taskAttempt.attemptId,
launchContext, container, taskAttempt.remoteTask));
// send event to speculator that our container needs are satisfied if (cEvent.getShufflePort() == -1) {
taskAttempt.eventHandler.handle // launch the container
(new SpeculatorEvent(taskAttempt.getID().getTaskId(), -1)); // create the container object to be launched for a given Task attempt
ContainerLaunchContext launchContext = createContainerLaunchContext(
cEvent.getApplicationACLs(), taskAttempt.conf, taskAttempt.jobToken,
taskAttempt.remoteTask, taskAttempt.oldJobId, taskAttempt.jvmID,
taskAttempt.taskAttemptListener, taskAttempt.credentials);
taskAttempt.eventHandler
.handle(new ContainerRemoteLaunchEvent(taskAttempt.attemptId,
launchContext, container, taskAttempt.remoteTask));
// send event to speculator that our container needs are satisfied
taskAttempt.eventHandler
.handle(new SpeculatorEvent(taskAttempt.getID().getTaskId(), -1));
return TaskAttemptStateInternal.ASSIGNED;
} else {
taskAttempt.onContainerLaunch(cEvent.getShufflePort());
return TaskAttemptStateInternal.RUNNING;
}
} }
} }
@ -1982,7 +1990,6 @@ public abstract class TaskAttemptImpl implements
private static class LaunchedContainerTransition implements private static class LaunchedContainerTransition implements
SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> { SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
@SuppressWarnings("unchecked")
@Override @Override
public void transition(TaskAttemptImpl taskAttempt, public void transition(TaskAttemptImpl taskAttempt,
TaskAttemptEvent evnt) { TaskAttemptEvent evnt) {
@ -1990,34 +1997,34 @@ public abstract class TaskAttemptImpl implements
TaskAttemptContainerLaunchedEvent event = TaskAttemptContainerLaunchedEvent event =
(TaskAttemptContainerLaunchedEvent) evnt; (TaskAttemptContainerLaunchedEvent) evnt;
//set the launch time taskAttempt.onContainerLaunch(event.getShufflePort());
taskAttempt.launchTime = taskAttempt.clock.getTime();
taskAttempt.shufflePort = event.getShufflePort();
// register it to TaskAttemptListener so that it can start monitoring it.
taskAttempt.taskAttemptListener
.registerLaunchedTask(taskAttempt.attemptId, taskAttempt.jvmID);
//TODO Resolve to host / IP in case of a local address.
InetSocketAddress nodeHttpInetAddr = // TODO: Costly to create sock-addr?
NetUtils.createSocketAddr(taskAttempt.container.getNodeHttpAddress());
taskAttempt.trackerName = nodeHttpInetAddr.getHostName();
taskAttempt.httpPort = nodeHttpInetAddr.getPort();
taskAttempt.sendLaunchedEvents();
taskAttempt.eventHandler.handle
(new SpeculatorEvent
(taskAttempt.attemptId, true, taskAttempt.clock.getTime()));
//make remoteTask reference as null as it is no more needed
//and free up the memory
taskAttempt.remoteTask = null;
//tell the Task that attempt has started
taskAttempt.eventHandler.handle(new TaskTAttemptEvent(
taskAttempt.attemptId,
TaskEventType.T_ATTEMPT_LAUNCHED));
} }
} }
@SuppressWarnings("unchecked")
private void onContainerLaunch(int shufflePortParam) {
// set the launch time
launchTime = clock.getTime();
this.shufflePort = shufflePortParam;
// register it to TaskAttemptListener so that it can start monitoring it.
taskAttemptListener.registerLaunchedTask(attemptId, jvmID);
// TODO Resolve to host / IP in case of a local address.
InetSocketAddress nodeHttpInetAddr = // TODO: Costly to create sock-addr?
NetUtils.createSocketAddr(container.getNodeHttpAddress());
trackerName = nodeHttpInetAddr.getHostName();
httpPort = nodeHttpInetAddr.getPort();
sendLaunchedEvents();
eventHandler.handle(new SpeculatorEvent(attemptId, true, clock.getTime()));
// make remoteTask reference as null as it is no more needed
// and free up the memory
remoteTask = null;
// tell the Task that attempt has started
eventHandler.handle(
new TaskTAttemptEvent(attemptId, TaskEventType.T_ATTEMPT_LAUNCHED));
}
private static class CommitPendingTransition implements private static class CommitPendingTransition implements
SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> { SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")

View File

@ -18,9 +18,12 @@
package org.apache.hadoop.mapreduce.v2.app.rm; package org.apache.hadoop.mapreduce.v2.app.rm;
import java.io.IOException; import java.io.IOException;
import java.util.Map;
import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerRequestor.ContainerRequest; import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerRequestor.ContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
@ -38,6 +41,9 @@ public interface ContainerRequestor {
void decContainerReq(ContainerRequest request); void decContainerReq(ContainerRequest request);
void containerAssigned(Container allocated, ContainerRequest assigned,
Map<ApplicationAccessType, String> acls);
void release(ContainerId containerId); void release(ContainerId containerId);
boolean isNodeBlacklisted(String hostname); boolean isNodeBlacklisted(String hostname);

View File

@ -53,7 +53,6 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.JobDiagnosticsUpdateEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobUpdatedNodesEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.JobUpdatedNodesEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptDiagnosticsUpdateEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptDiagnosticsUpdateEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
@ -257,7 +256,7 @@ public class RMContainerAllocator extends RMCommunicator
dispatcher.register(RMContainerReuseRequestor.EventType.class, dispatcher.register(RMContainerReuseRequestor.EventType.class,
(RMContainerReuseRequestor) containerRequestor); (RMContainerReuseRequestor) containerRequestor);
} else { } else {
containerRequestor = new RMContainerRequestor(this); containerRequestor = new RMContainerRequestor(eventHandler, this);
} }
containerRequestor.init(conf); containerRequestor.init(conf);
} }
@ -1298,11 +1297,8 @@ public class RMContainerAllocator extends RMCommunicator
private void containerAssigned(Container allocated, private void containerAssigned(Container allocated,
ContainerRequest assigned) { ContainerRequest assigned) {
// Update resource requests // Update resource requests
containerRequestor.decContainerReq(assigned); containerRequestor.containerAssigned(allocated, assigned,
applicationACLs);
// send the container-assigned event to task attempt
eventHandler.handle(new TaskAttemptContainerAssignedEvent(
assigned.attemptID, allocated, applicationACLs));
assignedRequests.add(allocated, assigned.attemptID); assignedRequests.add(allocated, assigned.attemptID);

View File

@ -34,25 +34,29 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent;
import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest; import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.ResourceRequest.ResourceRequestComparator; import org.apache.hadoop.yarn.api.records.ResourceRequest.ResourceRequestComparator;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -114,11 +118,16 @@ public class RMContainerRequestor extends AbstractService
.newSetFromMap(new ConcurrentHashMap<String, Boolean>()); .newSetFromMap(new ConcurrentHashMap<String, Boolean>());
private final ApplicationId applicationId; private final ApplicationId applicationId;
private final RMCommunicator rmCommunicator; private final RMCommunicator rmCommunicator;
@SuppressWarnings("rawtypes")
private EventHandler eventHandler;
public RMContainerRequestor(RMCommunicator rmCommunicator) { @SuppressWarnings("rawtypes")
public RMContainerRequestor(EventHandler eventHandler,
RMCommunicator rmCommunicator) {
super(RMContainerRequestor.class.getName()); super(RMContainerRequestor.class.getName());
this.rmCommunicator = rmCommunicator; this.rmCommunicator = rmCommunicator;
applicationId = rmCommunicator.applicationId; applicationId = rmCommunicator.applicationId;
this.eventHandler = eventHandler;
} }
@Private @Private
@ -424,17 +433,28 @@ public class RMContainerRequestor extends AbstractService
req.nodeLabelExpression); req.nodeLabelExpression);
} }
@SuppressWarnings("unchecked")
@Override
public void containerAssigned(Container allocated, ContainerRequest req,
Map<ApplicationAccessType, String> applicationACLs) {
decContainerReq(req);
// send the container-assigned event to task attempt
eventHandler.handle(new TaskAttemptContainerAssignedEvent(
req.attemptID, allocated, applicationACLs));
}
@Override @Override
public void decContainerReq(ContainerRequest req) { public void decContainerReq(ContainerRequest req) {
// Update resource requests // Update resource requests
for (String hostName : req.hosts) { for (String hostName : req.hosts) {
decResourceRequest(req.priority, hostName, req.capability); decResourceRequest(req.priority, hostName, req.capability);
} }
for (String rack : req.racks) { for (String rack : req.racks) {
decResourceRequest(req.priority, rack, req.capability); decResourceRequest(req.priority, rack, req.capability);
} }
decResourceRequest(req.priority, ResourceRequest.ANY, req.capability); decResourceRequest(req.priority, ResourceRequest.ANY, req.capability);
} }

View File

@ -34,7 +34,9 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Priority;
@ -53,8 +55,8 @@ public class RMContainerReuseRequestor extends RMContainerRequestor
private static final Log LOG = LogFactory private static final Log LOG = LogFactory
.getLog(RMContainerReuseRequestor.class); .getLog(RMContainerReuseRequestor.class);
private Map<Container, String> containersToReuse = private Map<Container, HostInfo> containersToReuse =
new ConcurrentHashMap<Container, String>(); new ConcurrentHashMap<>();
private Map<ContainerId, List<TaskAttemptId>> containerToTaskAttemptsMap = private Map<ContainerId, List<TaskAttemptId>> containerToTaskAttemptsMap =
new HashMap<ContainerId, List<TaskAttemptId>>(); new HashMap<ContainerId, List<TaskAttemptId>>();
private int containerReuseMaxMapTasks; private int containerReuseMaxMapTasks;
@ -63,14 +65,17 @@ public class RMContainerReuseRequestor extends RMContainerRequestor
private int maxReduceTaskContainers; private int maxReduceTaskContainers;
private int noOfMapTaskContainersForReuse; private int noOfMapTaskContainersForReuse;
private int noOfReduceTaskContainersForReuse; private int noOfReduceTaskContainersForReuse;
private final RMCommunicator rmCommunicator;
@SuppressWarnings("rawtypes")
private final EventHandler eventHandler;
private RMCommunicator rmCommunicator; @SuppressWarnings("rawtypes")
public RMContainerReuseRequestor( public RMContainerReuseRequestor(
EventHandler<ContainerAvailableEvent> eventHandler, EventHandler eventHandler,
RMCommunicator rmCommunicator) { RMCommunicator rmCommunicator) {
super(rmCommunicator); super(eventHandler, rmCommunicator);
this.rmCommunicator = rmCommunicator; this.rmCommunicator = rmCommunicator;
this.eventHandler = eventHandler;
} }
@Override @Override
@ -113,8 +118,8 @@ public class RMContainerReuseRequestor extends RMContainerRequestor
boolean blacklisted = super.isNodeBlacklisted(hostName); boolean blacklisted = super.isNodeBlacklisted(hostName);
if (blacklisted) { if (blacklisted) {
Set<Container> containersOnHost = new HashSet<Container>(); Set<Container> containersOnHost = new HashSet<Container>();
for (Entry<Container, String> elem : containersToReuse.entrySet()) { for (Entry<Container, HostInfo> elem : containersToReuse.entrySet()) {
if (elem.getValue().equals(hostName)) { if (elem.getValue().getHost().equals(hostName)) {
containersOnHost.add(elem.getKey()); containersOnHost.add(elem.getKey());
} }
} }
@ -139,6 +144,7 @@ public class RMContainerReuseRequestor extends RMContainerRequestor
containerTaskAttempts = new ArrayList<TaskAttemptId>(); containerTaskAttempts = new ArrayList<TaskAttemptId>();
containerToTaskAttemptsMap.put(containerId, containerTaskAttempts); containerToTaskAttemptsMap.put(containerId, containerTaskAttempts);
} }
TaskAttemptId taskAttemptId = event.getTaskAttemptId();
if (checkMapContainerReuseConstraints(priority, containerTaskAttempts) if (checkMapContainerReuseConstraints(priority, containerTaskAttempts)
|| checkReduceContainerReuseConstraints(priority, || checkReduceContainerReuseConstraints(priority,
containerTaskAttempts)) { containerTaskAttempts)) {
@ -147,13 +153,17 @@ public class RMContainerReuseRequestor extends RMContainerRequestor
// If there are any eligible requests // If there are any eligible requests
if (resourceRequests != null && !resourceRequests.isEmpty()) { if (resourceRequests != null && !resourceRequests.isEmpty()) {
canReuse = true; canReuse = true;
containerTaskAttempts.add(event.getTaskAttemptId()); containerTaskAttempts.add(taskAttemptId);
} }
} }
((RMContainerAllocator) rmCommunicator) ((RMContainerAllocator) rmCommunicator)
.resetContainerForReuse(container.getId()); .resetContainerForReuse(container.getId());
if (canReuse) { if (canReuse) {
containersToReuse.put(container, resourceName); int shufflePort =
rmCommunicator.getJob().getTask(taskAttemptId.getTaskId())
.getAttempt(taskAttemptId).getShufflePort();
containersToReuse.put(container,
new HostInfo(resourceName, shufflePort));
incrementRunningReuseContainers(priority); incrementRunningReuseContainers(priority);
LOG.info("Adding the " + containerId + " for reuse."); LOG.info("Adding the " + containerId + " for reuse.");
} else { } else {
@ -211,7 +221,7 @@ public class RMContainerReuseRequestor extends RMContainerRequestor
@Private @Private
@VisibleForTesting @VisibleForTesting
Map<Container, String> getContainersToReuse() { Map<Container, HostInfo> getContainersToReuse() {
return containersToReuse; return containersToReuse;
} }
@ -221,4 +231,34 @@ public class RMContainerReuseRequestor extends RMContainerRequestor
public static enum EventType { public static enum EventType {
CONTAINER_AVAILABLE CONTAINER_AVAILABLE
} }
@SuppressWarnings("unchecked")
@Override
public void containerAssigned(Container allocated, ContainerRequest req,
Map<ApplicationAccessType, String> applicationACLs) {
if(containersToReuse.containsKey(allocated)){
decContainerReq(req);
// send the container-assigned event to task attempt
eventHandler.handle(new TaskAttemptContainerAssignedEvent(
req.attemptID, allocated, applicationACLs));
} else {
super.containerAssigned(allocated, req, applicationACLs);
}
}
static class HostInfo {
private String host;
private int port;
public HostInfo(String host, int port) {
super();
this.host = host;
this.port = port;
}
public String getHost() {
return host;
}
public int getPort() {
return port;
}
}
} }

View File

@ -18,7 +18,9 @@
package org.apache.hadoop.mapreduce.v2.app.rm; package org.apache.hadoop.mapreduce.v2.app.rm;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.IOException; import java.io.IOException;
import java.util.Map; import java.util.Map;
@ -29,8 +31,12 @@ import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId; import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType; import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerRequestor.ContainerRequest; import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerRequestor.ContainerRequest;
import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerReuseRequestor.EventType; import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerReuseRequestor.EventType;
import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerReuseRequestor.HostInfo;
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils; import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
@ -51,8 +57,16 @@ public class TestRMContainerReuseRequestor {
@Before @Before
public void setup() throws IOException { public void setup() throws IOException {
RMContainerAllocator allocator = mock(RMContainerAllocator.class);
Job job = mock(Job.class);
Task task = mock(Task.class);
TaskAttempt taskAttempt = mock(TaskAttempt.class);
when(taskAttempt.getShufflePort()).thenReturn(0);
when(task.getAttempt(any(TaskAttemptId.class))).thenReturn(taskAttempt);
when(job.getTask(any(TaskId.class))).thenReturn(task);
when(allocator.getJob()).thenReturn(job);
reuseRequestor = new RMContainerReuseRequestor(null, reuseRequestor = new RMContainerReuseRequestor(null,
mock(RMContainerAllocator.class)); allocator);
} }
@Test @Test
@ -138,14 +152,14 @@ public class TestRMContainerReuseRequestor {
@Test @Test
public void testContainerFailedOnHost() throws Exception { public void testContainerFailedOnHost() throws Exception {
reuseRequestor.serviceInit(new Configuration()); reuseRequestor.serviceInit(new Configuration());
Map<Container, String> containersToReuse = reuseRequestor Map<Container, HostInfo> containersToReuse = reuseRequestor
.getContainersToReuse(); .getContainersToReuse();
containersToReuse containersToReuse
.put(newContainerInstance("container_1472171035081_0009_01_000008", .put(newContainerInstance("container_1472171035081_0009_01_000008",
RMContainerAllocator.PRIORITY_REDUCE), "node1"); RMContainerAllocator.PRIORITY_REDUCE), new HostInfo("node1", 1999));
containersToReuse containersToReuse
.put(newContainerInstance("container_1472171035081_0009_01_000009", .put(newContainerInstance("container_1472171035081_0009_01_000009",
RMContainerAllocator.PRIORITY_REDUCE), "node2"); RMContainerAllocator.PRIORITY_REDUCE), new HostInfo("node2", 1999));
reuseRequestor.getBlacklistedNodes().add("node1"); reuseRequestor.getBlacklistedNodes().add("node1");
// It removes all containers from containersToReuse running in node1 // It removes all containers from containersToReuse running in node1
reuseRequestor.containerFailedOnHost("node1"); reuseRequestor.containerFailedOnHost("node1");
@ -172,7 +186,7 @@ public class TestRMContainerReuseRequestor {
ContainerAvailableEvent event = new ContainerAvailableEvent( ContainerAvailableEvent event = new ContainerAvailableEvent(
EventType.CONTAINER_AVAILABLE, taskAttemptId, container); EventType.CONTAINER_AVAILABLE, taskAttemptId, container);
reuseRequestor.handle(event); reuseRequestor.handle(event);
Map<Container, String> containersToReuse = reuseRequestor Map<Container, HostInfo> containersToReuse = reuseRequestor
.getContainersToReuse(); .getContainersToReuse();
Assert.assertTrue("Container should be added for reuse.", Assert.assertTrue("Container should be added for reuse.",
containersToReuse.containsKey(container)); containersToReuse.containsKey(container));
@ -206,7 +220,7 @@ public class TestRMContainerReuseRequestor {
ContainerAvailableEvent event1 = new ContainerAvailableEvent(eventType, ContainerAvailableEvent event1 = new ContainerAvailableEvent(eventType,
taskAttemptId1, container); taskAttemptId1, container);
reuseRequestor.handle(event1); reuseRequestor.handle(event1);
Map<Container, String> containersToReuse = reuseRequestor Map<Container, HostInfo> containersToReuse = reuseRequestor
.getContainersToReuse(); .getContainersToReuse();
// It is reusing the container // It is reusing the container
Assert.assertTrue("Container should be added for reuse.", Assert.assertTrue("Container should be added for reuse.",
@ -236,7 +250,7 @@ public class TestRMContainerReuseRequestor {
ContainerAvailableEvent event1 = new ContainerAvailableEvent( ContainerAvailableEvent event1 = new ContainerAvailableEvent(
EventType.CONTAINER_AVAILABLE, taskAttemptId1, container); EventType.CONTAINER_AVAILABLE, taskAttemptId1, container);
reuseRequestor.handle(event1); reuseRequestor.handle(event1);
Map<Container, String> containersToReuse = reuseRequestor Map<Container, HostInfo> containersToReuse = reuseRequestor
.getContainersToReuse(); .getContainersToReuse();
Assert.assertTrue("Container should be added for reuse.", Assert.assertTrue("Container should be added for reuse.",
containersToReuse.containsKey(container)); containersToReuse.containsKey(container));
@ -269,7 +283,7 @@ public class TestRMContainerReuseRequestor {
ContainerAvailableEvent event1 = new ContainerAvailableEvent(eventType, ContainerAvailableEvent event1 = new ContainerAvailableEvent(eventType,
taskAttemptId1, container1); taskAttemptId1, container1);
reuseRequestor.handle(event1); reuseRequestor.handle(event1);
Map<Container, String> containersToReuse = reuseRequestor Map<Container, HostInfo> containersToReuse = reuseRequestor
.getContainersToReuse(); .getContainersToReuse();
Assert.assertTrue("Container should be added for reuse.", Assert.assertTrue("Container should be added for reuse.",
containersToReuse.containsKey(container1)); containersToReuse.containsKey(container1));