From f49cec888a2ef91d0ace1db51123641ddb30e42c Mon Sep 17 00:00:00 2001 From: Devaraj K Date: Tue, 4 Apr 2017 15:48:35 -0700 Subject: [PATCH] MAPREDUCE-6785. ContainerLauncherImpl support for reusing the containers. Contributed by Naganarasimha G R. --- .../TaskAttemptContainerAssignedEvent.java | 9 ++ .../v2/app/job/impl/TaskAttemptImpl.java | 99 ++++++++++--------- .../v2/app/rm/ContainerRequestor.java | 6 ++ .../v2/app/rm/RMContainerAllocator.java | 10 +- .../v2/app/rm/RMContainerRequestor.java | 30 +++++- .../v2/app/rm/RMContainerReuseRequestor.java | 62 +++++++++--- .../app/rm/TestRMContainerReuseRequestor.java | 30 ++++-- 7 files changed, 169 insertions(+), 77 deletions(-) diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptContainerAssignedEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptContainerAssignedEvent.java index 0f69fa8c206..41dfb0370d2 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptContainerAssignedEvent.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptContainerAssignedEvent.java @@ -28,6 +28,7 @@ public class TaskAttemptContainerAssignedEvent extends TaskAttemptEvent { private final Container container; private final Map applicationACLs; + private int shufflePort = -1; public TaskAttemptContainerAssignedEvent(TaskAttemptId id, Container container, Map applicationACLs) { @@ -36,6 +37,14 @@ public class TaskAttemptContainerAssignedEvent extends TaskAttemptEvent { this.applicationACLs = applicationACLs; } + public int getShufflePort() { + return shufflePort; + } + + public void setShufflePort(int shufflePort) { + this.shufflePort = shufflePort; + } + public Container getContainer() { return this.container; } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java index 3943a3aa913..1f9ea90c7cc 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java @@ -265,7 +265,8 @@ public abstract class TaskAttemptImpl implements // Transitions from the UNASSIGNED state. .addTransition(TaskAttemptStateInternal.UNASSIGNED, - TaskAttemptStateInternal.ASSIGNED, TaskAttemptEventType.TA_ASSIGNED, + EnumSet.of(TaskAttemptStateInternal.ASSIGNED, + TaskAttemptStateInternal.RUNNING), TaskAttemptEventType.TA_ASSIGNED, new ContainerAssignedTransition()) .addTransition(TaskAttemptStateInternal.UNASSIGNED, TaskAttemptStateInternal.KILLED, TaskAttemptEventType.TA_KILL, new DeallocateContainerTransition( @@ -1876,13 +1877,14 @@ public abstract class TaskAttemptImpl implements } private static class ContainerAssignedTransition implements - SingleArcTransition { + MultipleArcTransition { @SuppressWarnings({ "unchecked" }) @Override - public void transition(final TaskAttemptImpl taskAttempt, - TaskAttemptEvent event) { - final TaskAttemptContainerAssignedEvent cEvent = - (TaskAttemptContainerAssignedEvent) event; + public TaskAttemptStateInternal transition( + final TaskAttemptImpl taskAttempt, TaskAttemptEvent event) { + final TaskAttemptContainerAssignedEvent cEvent = + (TaskAttemptContainerAssignedEvent) event; Container container = cEvent.getContainer(); taskAttempt.container = container; // this is a _real_ Task (classic Hadoop mapred flavor): @@ -1895,20 +1897,26 @@ public abstract class TaskAttemptImpl implements taskAttempt.remoteTask, taskAttempt.jvmID); taskAttempt.computeRackAndLocality(); - - //launch the container - //create the container object to be launched for a given Task attempt - ContainerLaunchContext launchContext = createContainerLaunchContext( - cEvent.getApplicationACLs(), taskAttempt.conf, taskAttempt.jobToken, - taskAttempt.remoteTask, taskAttempt.oldJobId, taskAttempt.jvmID, - taskAttempt.taskAttemptListener, taskAttempt.credentials); - taskAttempt.eventHandler - .handle(new ContainerRemoteLaunchEvent(taskAttempt.attemptId, - launchContext, container, taskAttempt.remoteTask)); - // send event to speculator that our container needs are satisfied - taskAttempt.eventHandler.handle - (new SpeculatorEvent(taskAttempt.getID().getTaskId(), -1)); + if (cEvent.getShufflePort() == -1) { + // launch the container + // create the container object to be launched for a given Task attempt + ContainerLaunchContext launchContext = createContainerLaunchContext( + cEvent.getApplicationACLs(), taskAttempt.conf, taskAttempt.jobToken, + taskAttempt.remoteTask, taskAttempt.oldJobId, taskAttempt.jvmID, + taskAttempt.taskAttemptListener, taskAttempt.credentials); + taskAttempt.eventHandler + .handle(new ContainerRemoteLaunchEvent(taskAttempt.attemptId, + launchContext, container, taskAttempt.remoteTask)); + + // send event to speculator that our container needs are satisfied + taskAttempt.eventHandler + .handle(new SpeculatorEvent(taskAttempt.getID().getTaskId(), -1)); + return TaskAttemptStateInternal.ASSIGNED; + } else { + taskAttempt.onContainerLaunch(cEvent.getShufflePort()); + return TaskAttemptStateInternal.RUNNING; + } } } @@ -1982,7 +1990,6 @@ public abstract class TaskAttemptImpl implements private static class LaunchedContainerTransition implements SingleArcTransition { - @SuppressWarnings("unchecked") @Override public void transition(TaskAttemptImpl taskAttempt, TaskAttemptEvent evnt) { @@ -1990,34 +1997,34 @@ public abstract class TaskAttemptImpl implements TaskAttemptContainerLaunchedEvent event = (TaskAttemptContainerLaunchedEvent) evnt; - //set the launch time - taskAttempt.launchTime = taskAttempt.clock.getTime(); - taskAttempt.shufflePort = event.getShufflePort(); - - // register it to TaskAttemptListener so that it can start monitoring it. - taskAttempt.taskAttemptListener - .registerLaunchedTask(taskAttempt.attemptId, taskAttempt.jvmID); - - //TODO Resolve to host / IP in case of a local address. - InetSocketAddress nodeHttpInetAddr = // TODO: Costly to create sock-addr? - NetUtils.createSocketAddr(taskAttempt.container.getNodeHttpAddress()); - taskAttempt.trackerName = nodeHttpInetAddr.getHostName(); - taskAttempt.httpPort = nodeHttpInetAddr.getPort(); - taskAttempt.sendLaunchedEvents(); - taskAttempt.eventHandler.handle - (new SpeculatorEvent - (taskAttempt.attemptId, true, taskAttempt.clock.getTime())); - //make remoteTask reference as null as it is no more needed - //and free up the memory - taskAttempt.remoteTask = null; - - //tell the Task that attempt has started - taskAttempt.eventHandler.handle(new TaskTAttemptEvent( - taskAttempt.attemptId, - TaskEventType.T_ATTEMPT_LAUNCHED)); + taskAttempt.onContainerLaunch(event.getShufflePort()); } } - + + @SuppressWarnings("unchecked") + private void onContainerLaunch(int shufflePortParam) { + // set the launch time + launchTime = clock.getTime(); + this.shufflePort = shufflePortParam; + + // register it to TaskAttemptListener so that it can start monitoring it. + taskAttemptListener.registerLaunchedTask(attemptId, jvmID); + // TODO Resolve to host / IP in case of a local address. + InetSocketAddress nodeHttpInetAddr = // TODO: Costly to create sock-addr? + NetUtils.createSocketAddr(container.getNodeHttpAddress()); + trackerName = nodeHttpInetAddr.getHostName(); + httpPort = nodeHttpInetAddr.getPort(); + sendLaunchedEvents(); + eventHandler.handle(new SpeculatorEvent(attemptId, true, clock.getTime())); + // make remoteTask reference as null as it is no more needed + // and free up the memory + remoteTask = null; + + // tell the Task that attempt has started + eventHandler.handle( + new TaskTAttemptEvent(attemptId, TaskEventType.T_ATTEMPT_LAUNCHED)); + } + private static class CommitPendingTransition implements SingleArcTransition { @SuppressWarnings("unchecked") diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerRequestor.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerRequestor.java index 2d546334290..9842e0d1512 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerRequestor.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerRequestor.java @@ -18,9 +18,12 @@ package org.apache.hadoop.mapreduce.v2.app.rm; import java.io.IOException; +import java.util.Map; import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerRequestor.ContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.records.ApplicationAccessType; +import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -38,6 +41,9 @@ public interface ContainerRequestor { void decContainerReq(ContainerRequest request); + void containerAssigned(Container allocated, ContainerRequest assigned, + Map acls); + void release(ContainerId containerId); boolean isNodeBlacklisted(String hostname); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java index 7d0b4b72fb7..b6fd1fa9372 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java @@ -53,7 +53,6 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.JobDiagnosticsUpdateEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.JobUpdatedNodesEvent; -import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptDiagnosticsUpdateEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; @@ -257,7 +256,7 @@ public class RMContainerAllocator extends RMCommunicator dispatcher.register(RMContainerReuseRequestor.EventType.class, (RMContainerReuseRequestor) containerRequestor); } else { - containerRequestor = new RMContainerRequestor(this); + containerRequestor = new RMContainerRequestor(eventHandler, this); } containerRequestor.init(conf); } @@ -1298,11 +1297,8 @@ public class RMContainerAllocator extends RMCommunicator private void containerAssigned(Container allocated, ContainerRequest assigned) { // Update resource requests - containerRequestor.decContainerReq(assigned); - - // send the container-assigned event to task attempt - eventHandler.handle(new TaskAttemptContainerAssignedEvent( - assigned.attemptID, allocated, applicationACLs)); + containerRequestor.containerAssigned(allocated, assigned, + applicationACLs); assignedRequests.add(allocated, assigned.attemptID); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java index 82ef24f4a08..f68227b0014 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java @@ -34,25 +34,29 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; +import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest; import org.apache.hadoop.yarn.api.records.ExecutionType; +import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest.ResourceRequestComparator; +import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting; -import org.apache.hadoop.yarn.util.resource.Resources; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -114,11 +118,16 @@ public class RMContainerRequestor extends AbstractService .newSetFromMap(new ConcurrentHashMap()); private final ApplicationId applicationId; private final RMCommunicator rmCommunicator; + @SuppressWarnings("rawtypes") + private EventHandler eventHandler; - public RMContainerRequestor(RMCommunicator rmCommunicator) { + @SuppressWarnings("rawtypes") + public RMContainerRequestor(EventHandler eventHandler, + RMCommunicator rmCommunicator) { super(RMContainerRequestor.class.getName()); this.rmCommunicator = rmCommunicator; applicationId = rmCommunicator.applicationId; + this.eventHandler = eventHandler; } @Private @@ -424,17 +433,28 @@ public class RMContainerRequestor extends AbstractService req.nodeLabelExpression); } + @SuppressWarnings("unchecked") + @Override + public void containerAssigned(Container allocated, ContainerRequest req, + Map applicationACLs) { + decContainerReq(req); + + // send the container-assigned event to task attempt + eventHandler.handle(new TaskAttemptContainerAssignedEvent( + req.attemptID, allocated, applicationACLs)); + } + @Override public void decContainerReq(ContainerRequest req) { // Update resource requests for (String hostName : req.hosts) { decResourceRequest(req.priority, hostName, req.capability); } - + for (String rack : req.racks) { decResourceRequest(req.priority, rack, req.capability); } - + decResourceRequest(req.priority, ResourceRequest.ANY, req.capability); } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerReuseRequestor.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerReuseRequestor.java index 7559693e4a4..2bdfa9128d9 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerReuseRequestor.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerReuseRequestor.java @@ -34,7 +34,9 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; +import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.Priority; @@ -53,8 +55,8 @@ public class RMContainerReuseRequestor extends RMContainerRequestor private static final Log LOG = LogFactory .getLog(RMContainerReuseRequestor.class); - private Map containersToReuse = - new ConcurrentHashMap(); + private Map containersToReuse = + new ConcurrentHashMap<>(); private Map> containerToTaskAttemptsMap = new HashMap>(); private int containerReuseMaxMapTasks; @@ -63,14 +65,17 @@ public class RMContainerReuseRequestor extends RMContainerRequestor private int maxReduceTaskContainers; private int noOfMapTaskContainersForReuse; private int noOfReduceTaskContainersForReuse; + private final RMCommunicator rmCommunicator; + @SuppressWarnings("rawtypes") + private final EventHandler eventHandler; - private RMCommunicator rmCommunicator; - + @SuppressWarnings("rawtypes") public RMContainerReuseRequestor( - EventHandler eventHandler, + EventHandler eventHandler, RMCommunicator rmCommunicator) { - super(rmCommunicator); + super(eventHandler, rmCommunicator); this.rmCommunicator = rmCommunicator; + this.eventHandler = eventHandler; } @Override @@ -113,8 +118,8 @@ public class RMContainerReuseRequestor extends RMContainerRequestor boolean blacklisted = super.isNodeBlacklisted(hostName); if (blacklisted) { Set containersOnHost = new HashSet(); - for (Entry elem : containersToReuse.entrySet()) { - if (elem.getValue().equals(hostName)) { + for (Entry elem : containersToReuse.entrySet()) { + if (elem.getValue().getHost().equals(hostName)) { containersOnHost.add(elem.getKey()); } } @@ -139,6 +144,7 @@ public class RMContainerReuseRequestor extends RMContainerRequestor containerTaskAttempts = new ArrayList(); containerToTaskAttemptsMap.put(containerId, containerTaskAttempts); } + TaskAttemptId taskAttemptId = event.getTaskAttemptId(); if (checkMapContainerReuseConstraints(priority, containerTaskAttempts) || checkReduceContainerReuseConstraints(priority, containerTaskAttempts)) { @@ -147,13 +153,17 @@ public class RMContainerReuseRequestor extends RMContainerRequestor // If there are any eligible requests if (resourceRequests != null && !resourceRequests.isEmpty()) { canReuse = true; - containerTaskAttempts.add(event.getTaskAttemptId()); + containerTaskAttempts.add(taskAttemptId); } } ((RMContainerAllocator) rmCommunicator) .resetContainerForReuse(container.getId()); if (canReuse) { - containersToReuse.put(container, resourceName); + int shufflePort = + rmCommunicator.getJob().getTask(taskAttemptId.getTaskId()) + .getAttempt(taskAttemptId).getShufflePort(); + containersToReuse.put(container, + new HostInfo(resourceName, shufflePort)); incrementRunningReuseContainers(priority); LOG.info("Adding the " + containerId + " for reuse."); } else { @@ -211,7 +221,7 @@ public class RMContainerReuseRequestor extends RMContainerRequestor @Private @VisibleForTesting - Map getContainersToReuse() { + Map getContainersToReuse() { return containersToReuse; } @@ -221,4 +231,34 @@ public class RMContainerReuseRequestor extends RMContainerRequestor public static enum EventType { CONTAINER_AVAILABLE } + + @SuppressWarnings("unchecked") + @Override + public void containerAssigned(Container allocated, ContainerRequest req, + Map applicationACLs) { + if(containersToReuse.containsKey(allocated)){ + decContainerReq(req); + // send the container-assigned event to task attempt + eventHandler.handle(new TaskAttemptContainerAssignedEvent( + req.attemptID, allocated, applicationACLs)); + } else { + super.containerAssigned(allocated, req, applicationACLs); + } + } + + static class HostInfo { + private String host; + private int port; + public HostInfo(String host, int port) { + super(); + this.host = host; + this.port = port; + } + public String getHost() { + return host; + } + public int getPort() { + return port; + } + } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerReuseRequestor.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerReuseRequestor.java index d747e7463b2..2ca7cc812cb 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerReuseRequestor.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerReuseRequestor.java @@ -18,7 +18,9 @@ package org.apache.hadoop.mapreduce.v2.app.rm; +import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import java.io.IOException; import java.util.Map; @@ -29,8 +31,12 @@ import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; import org.apache.hadoop.mapreduce.v2.api.records.TaskId; import org.apache.hadoop.mapreduce.v2.api.records.TaskType; +import org.apache.hadoop.mapreduce.v2.app.job.Job; +import org.apache.hadoop.mapreduce.v2.app.job.Task; +import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt; import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerRequestor.ContainerRequest; import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerReuseRequestor.EventType; +import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerReuseRequestor.HostInfo; import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -51,8 +57,16 @@ public class TestRMContainerReuseRequestor { @Before public void setup() throws IOException { + RMContainerAllocator allocator = mock(RMContainerAllocator.class); + Job job = mock(Job.class); + Task task = mock(Task.class); + TaskAttempt taskAttempt = mock(TaskAttempt.class); + when(taskAttempt.getShufflePort()).thenReturn(0); + when(task.getAttempt(any(TaskAttemptId.class))).thenReturn(taskAttempt); + when(job.getTask(any(TaskId.class))).thenReturn(task); + when(allocator.getJob()).thenReturn(job); reuseRequestor = new RMContainerReuseRequestor(null, - mock(RMContainerAllocator.class)); + allocator); } @Test @@ -138,14 +152,14 @@ public class TestRMContainerReuseRequestor { @Test public void testContainerFailedOnHost() throws Exception { reuseRequestor.serviceInit(new Configuration()); - Map containersToReuse = reuseRequestor + Map containersToReuse = reuseRequestor .getContainersToReuse(); containersToReuse .put(newContainerInstance("container_1472171035081_0009_01_000008", - RMContainerAllocator.PRIORITY_REDUCE), "node1"); + RMContainerAllocator.PRIORITY_REDUCE), new HostInfo("node1", 1999)); containersToReuse .put(newContainerInstance("container_1472171035081_0009_01_000009", - RMContainerAllocator.PRIORITY_REDUCE), "node2"); + RMContainerAllocator.PRIORITY_REDUCE), new HostInfo("node2", 1999)); reuseRequestor.getBlacklistedNodes().add("node1"); // It removes all containers from containersToReuse running in node1 reuseRequestor.containerFailedOnHost("node1"); @@ -172,7 +186,7 @@ public class TestRMContainerReuseRequestor { ContainerAvailableEvent event = new ContainerAvailableEvent( EventType.CONTAINER_AVAILABLE, taskAttemptId, container); reuseRequestor.handle(event); - Map containersToReuse = reuseRequestor + Map containersToReuse = reuseRequestor .getContainersToReuse(); Assert.assertTrue("Container should be added for reuse.", containersToReuse.containsKey(container)); @@ -206,7 +220,7 @@ public class TestRMContainerReuseRequestor { ContainerAvailableEvent event1 = new ContainerAvailableEvent(eventType, taskAttemptId1, container); reuseRequestor.handle(event1); - Map containersToReuse = reuseRequestor + Map containersToReuse = reuseRequestor .getContainersToReuse(); // It is reusing the container Assert.assertTrue("Container should be added for reuse.", @@ -236,7 +250,7 @@ public class TestRMContainerReuseRequestor { ContainerAvailableEvent event1 = new ContainerAvailableEvent( EventType.CONTAINER_AVAILABLE, taskAttemptId1, container); reuseRequestor.handle(event1); - Map containersToReuse = reuseRequestor + Map containersToReuse = reuseRequestor .getContainersToReuse(); Assert.assertTrue("Container should be added for reuse.", containersToReuse.containsKey(container)); @@ -269,7 +283,7 @@ public class TestRMContainerReuseRequestor { ContainerAvailableEvent event1 = new ContainerAvailableEvent(eventType, taskAttemptId1, container1); reuseRequestor.handle(event1); - Map containersToReuse = reuseRequestor + Map containersToReuse = reuseRequestor .getContainersToReuse(); Assert.assertTrue("Container should be added for reuse.", containersToReuse.containsKey(container1));