diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java index bc993a28f9c..583e774156a 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java @@ -972,7 +972,7 @@ public class MRAppMaster extends CompositeService { , containerID); } else { this.containerAllocator = new RMContainerAllocator( - this.clientService, this.context, preemptionPolicy); + this.clientService, this.context, preemptionPolicy, dispatcher); } ((Service)this.containerAllocator).init(getConfig()); ((Service)this.containerAllocator).start(); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerAvailableEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerAvailableEvent.java new file mode 100644 index 00000000000..9b56b5e0d3f --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerAvailableEvent.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapreduce.v2.app.rm; + +import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.event.AbstractEvent; + +/** + * Event class for ContainerRequestor. + */ +public class ContainerAvailableEvent + extends AbstractEvent { + + private final TaskAttemptId taskAttemptId; + private final Container container; + + public ContainerAvailableEvent(RMContainerReuseRequestor.EventType eventType, + TaskAttemptId taskAttemptId, Container container) { + super(eventType); + this.taskAttemptId = taskAttemptId; + this.container = container; + } + + public TaskAttemptId getTaskAttemptId() { + return taskAttemptId; + } + + public Container getContainer() { + return container; + } +} \ No newline at end of file diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java index 5b8f2455a26..7d0b4b72fb7 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java @@ -78,6 +78,7 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.client.ClientRMProxy; import org.apache.hadoop.yarn.client.api.NMTokenCache; +import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException; import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException; import org.apache.hadoop.yarn.exceptions.InvalidLabelResourceRequestException; @@ -198,10 +199,13 @@ public class RMContainerAllocator extends RMCommunicator private String reduceNodeLabelExpression; + private Dispatcher dispatcher; + public RMContainerAllocator(ClientService clientService, AppContext context, - AMPreemptionPolicy preemptionPolicy) { + AMPreemptionPolicy preemptionPolicy, Dispatcher dispatcher) { super(clientService, context); this.preemptionPolicy = preemptionPolicy; + this.dispatcher = dispatcher; this.stopped = new AtomicBoolean(false); this.clock = context.getClock(); this.assignedRequests = createAssignedRequests(); @@ -247,7 +251,14 @@ public class RMContainerAllocator extends RMCommunicator MRJobConfig.DEFAULT_MR_NUM_OPPORTUNISTIC_MAPS_PERCENT)); LOG.info(this.scheduledRequests.getNumOpportunisticMapsPercent() + "% of the mappers will be scheduled using OPPORTUNISTIC containers"); - containerRequestor = new RMContainerRequestor(this); + if (conf.getBoolean(MRJobConfig.MR_AM_CONTAINER_REUSE_ENABLED, + MRJobConfig.DEFAULT_MR_AM_CONTAINER_REUSE_ENABLED)) { + containerRequestor = new RMContainerReuseRequestor(eventHandler, this); + dispatcher.register(RMContainerReuseRequestor.EventType.class, + (RMContainerReuseRequestor) containerRequestor); + } else { + containerRequestor = new RMContainerRequestor(this); + } containerRequestor.init(conf); } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java index 4e094f20818..82ef24f4a08 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java @@ -79,7 +79,7 @@ public class RMContainerRequestor extends AbstractService //Value->Map //Key->Resource Capability //Value->ResourceRequest - private final Map>> + protected final Map>> remoteRequestsTable = new TreeMap>>(); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerReuseRequestor.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerReuseRequestor.java new file mode 100644 index 00000000000..7559693e4a4 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerReuseRequestor.java @@ -0,0 +1,224 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.mapreduce.v2.app.rm; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.exceptions.YarnException; + +import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting; + +/** + * Keeps the data for RMContainer's reuse. + */ +public class RMContainerReuseRequestor extends RMContainerRequestor + implements EventHandler { + private static final Log LOG = LogFactory + .getLog(RMContainerReuseRequestor.class); + + private Map containersToReuse = + new ConcurrentHashMap(); + private Map> containerToTaskAttemptsMap = + new HashMap>(); + private int containerReuseMaxMapTasks; + private int containerReuseMaxReduceTasks; + private int maxMapTaskContainers; + private int maxReduceTaskContainers; + private int noOfMapTaskContainersForReuse; + private int noOfReduceTaskContainersForReuse; + + private RMCommunicator rmCommunicator; + + public RMContainerReuseRequestor( + EventHandler eventHandler, + RMCommunicator rmCommunicator) { + super(rmCommunicator); + this.rmCommunicator = rmCommunicator; + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + super.serviceInit(conf); + containerReuseMaxMapTasks = conf.getInt( + MRJobConfig.MR_AM_CONTAINER_REUSE_MAX_MAPTASKS, + MRJobConfig.DEFAULT_MR_AM_CONTAINER_REUSE_MAX_MAPTASKS); + containerReuseMaxReduceTasks = conf.getInt( + MRJobConfig.MR_AM_CONTAINER_REUSE_MAX_REDUCETASKS, + MRJobConfig.DEFAULT_MR_AM_CONTAINER_REUSE_MAX_REDUCETASKS); + maxMapTaskContainers = conf.getInt( + MRJobConfig.MR_AM_CONTAINER_REUSE_MAX_MAPTASKCONTAINERS, + MRJobConfig.DEFAULT_MR_AM_CONTAINER_REUSE_MAX_MAPTASKCONTAINERS); + maxReduceTaskContainers = conf.getInt( + MRJobConfig.MR_AM_CONTAINER_REUSE_MAX_REDUCETASKCONTAINERS, + MRJobConfig.DEFAULT_MR_AM_CONTAINER_REUSE_MAX_REDUCETASKCONTAINERS); + } + + @Override + protected void serviceStart() throws Exception { + super.serviceStart(); + } + + @Override + public AllocateResponse makeRemoteRequest() + throws YarnException, IOException { + AllocateResponse amResponse = super.makeRemoteRequest(); + synchronized (containersToReuse) { + List allocatedContainers = amResponse.getAllocatedContainers(); + allocatedContainers.addAll(containersToReuse.keySet()); + containersToReuse.clear(); + } + return amResponse; + } + + @Override + public void containerFailedOnHost(String hostName) { + super.containerFailedOnHost(hostName); + boolean blacklisted = super.isNodeBlacklisted(hostName); + if (blacklisted) { + Set containersOnHost = new HashSet(); + for (Entry elem : containersToReuse.entrySet()) { + if (elem.getValue().equals(hostName)) { + containersOnHost.add(elem.getKey()); + } + } + for (Container container : containersOnHost) { + containersToReuse.remove(container); + } + } + } + + @Override + public void handle(ContainerAvailableEvent event) { + Container container = event.getContainer(); + ContainerId containerId = container.getId(); + String resourceName = container.getNodeId().getHost(); + boolean canReuse = false; + Priority priority = container.getPriority(); + if (RMContainerAllocator.PRIORITY_MAP.equals(priority) + || RMContainerAllocator.PRIORITY_REDUCE.equals(priority)) { + List containerTaskAttempts = null; + containerTaskAttempts = containerToTaskAttemptsMap.get(containerId); + if (containerTaskAttempts == null) { + containerTaskAttempts = new ArrayList(); + containerToTaskAttemptsMap.put(containerId, containerTaskAttempts); + } + if (checkMapContainerReuseConstraints(priority, containerTaskAttempts) + || checkReduceContainerReuseConstraints(priority, + containerTaskAttempts)) { + Map> resourceRequests = + remoteRequestsTable.get(priority); + // If there are any eligible requests + if (resourceRequests != null && !resourceRequests.isEmpty()) { + canReuse = true; + containerTaskAttempts.add(event.getTaskAttemptId()); + } + } + ((RMContainerAllocator) rmCommunicator) + .resetContainerForReuse(container.getId()); + if (canReuse) { + containersToReuse.put(container, resourceName); + incrementRunningReuseContainers(priority); + LOG.info("Adding the " + containerId + " for reuse."); + } else { + LOG.info("Releasing the container : " + containerId + + " since it is not eligible for reuse or no pending requests."); + containerComplete(container); + pendingRelease.add(containerId); + release(containerId); + } + } + } + + private boolean checkMapContainerReuseConstraints(Priority priority, + List containerTaskAttempts) { + return RMContainerAllocator.PRIORITY_MAP.equals(priority) + // Check for how many tasks can map task container run maximum + && ((containerTaskAttempts.size() < containerReuseMaxMapTasks + || containerReuseMaxMapTasks == -1) + // Check for no of map task containers running + && (noOfMapTaskContainersForReuse < maxMapTaskContainers + || maxMapTaskContainers == -1)); + } + + private boolean checkReduceContainerReuseConstraints(Priority priority, + List containerTaskAttempts) { + return RMContainerAllocator.PRIORITY_REDUCE.equals(priority) + // Check for how many tasks can reduce task container run maximum + && ((containerTaskAttempts.size() < containerReuseMaxReduceTasks + || containerReuseMaxReduceTasks == -1) + // Check for no of reduce task containers running + && (noOfReduceTaskContainersForReuse < maxReduceTaskContainers + || maxReduceTaskContainers == -1)); + } + + private void containerComplete(Container container) { + if (!containerToTaskAttemptsMap.containsKey(container.getId())) { + return; + } + containerToTaskAttemptsMap.remove(container.getId()); + if (RMContainerAllocator.PRIORITY_MAP.equals(container.getPriority())) { + noOfMapTaskContainersForReuse--; + } else if (RMContainerAllocator.PRIORITY_REDUCE + .equals(container.getPriority())) { + noOfReduceTaskContainersForReuse--; + } + } + + private void incrementRunningReuseContainers(Priority priority) { + if (RMContainerAllocator.PRIORITY_MAP.equals(priority)) { + noOfMapTaskContainersForReuse++; + } else if (RMContainerAllocator.PRIORITY_REDUCE.equals(priority)) { + noOfReduceTaskContainersForReuse++; + } + } + + @Private + @VisibleForTesting + Map getContainersToReuse() { + return containersToReuse; + } + + /** + * Container Available EventType. + */ + public static enum EventType { + CONTAINER_AVAILABLE + } +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRAppBenchmark.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRAppBenchmark.java index efe150fad19..6c3ece693cf 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRAppBenchmark.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRAppBenchmark.java @@ -209,7 +209,7 @@ public class MRAppBenchmark { ClientService clientService, AppContext context) { AMPreemptionPolicy policy = new NoopAMPreemptionPolicy(); - return new RMContainerAllocator(clientService, context, policy) { + return new RMContainerAllocator(clientService, context, policy, null) { @Override protected ApplicationMasterProtocol createSchedulerProxy() { return new ApplicationMasterProtocol() { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java index 8127e86401b..4e23aee13ce 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java @@ -2027,7 +2027,8 @@ public class TestRMContainerAllocator { // Use this constructor when using a real job. MyContainerAllocator(MyResourceManager rm, ApplicationAttemptId appAttemptId, AppContext context) { - super(createMockClientService(), context, new NoopAMPreemptionPolicy()); + super(createMockClientService(), context, new NoopAMPreemptionPolicy(), + null); this.rm = rm; } @@ -2035,7 +2036,7 @@ public class TestRMContainerAllocator { public MyContainerAllocator(MyResourceManager rm, Configuration conf, ApplicationAttemptId appAttemptId, Job job) { super(createMockClientService(), createAppContext(appAttemptId, job), - new NoopAMPreemptionPolicy()); + new NoopAMPreemptionPolicy(), null); this.rm = rm; super.init(conf); super.start(); @@ -2045,7 +2046,7 @@ public class TestRMContainerAllocator { ApplicationAttemptId appAttemptId, Job job, Clock clock) { super(createMockClientService(), createAppContext(appAttemptId, job, clock), - new NoopAMPreemptionPolicy()); + new NoopAMPreemptionPolicy(), null); this.rm = rm; super.init(conf); super.start(); @@ -2166,7 +2167,7 @@ public class TestRMContainerAllocator { AMPreemptionPolicy policy = mock(AMPreemptionPolicy.class); when(communicator.getJob()).thenReturn(mockJob); RMContainerAllocator allocator = new RMContainerAllocator(service, context, - policy); + policy, null); AllocateResponse response = Records.newRecord(AllocateResponse.class); allocator.handleJobPriorityChange(response); } @@ -2369,7 +2370,7 @@ public class TestRMContainerAllocator { RMContainerAllocator allocator = new RMContainerAllocator( mock(ClientService.class), appContext, - new NoopAMPreemptionPolicy()) { + new NoopAMPreemptionPolicy(), null) { @Override protected void register() { } @@ -2420,8 +2421,8 @@ public class TestRMContainerAllocator { public void testCompletedContainerEvent() { RMContainerAllocator allocator = new RMContainerAllocator( mock(ClientService.class), mock(AppContext.class), - new NoopAMPreemptionPolicy()); - + new NoopAMPreemptionPolicy(), null); + TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId( MRBuilderUtils.newTaskId( MRBuilderUtils.newJobId(1, 1, 1), 1, TaskType.MAP), 1); @@ -3319,7 +3320,7 @@ public class TestRMContainerAllocator { extends RMContainerAllocator { public RMContainerAllocatorForFinishedContainer(ClientService clientService, AppContext context, AMPreemptionPolicy preemptionPolicy) { - super(clientService, context, preemptionPolicy); + super(clientService, context, preemptionPolicy, null); } @Override protected AssignedRequests createAssignedRequests() { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerReuseRequestor.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerReuseRequestor.java new file mode 100644 index 00000000000..d747e7463b2 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerReuseRequestor.java @@ -0,0 +1,307 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.mapreduce.v2.app.rm; + +import static org.mockito.Mockito.mock; + +import java.io.IOException; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hadoop.mapreduce.v2.api.records.JobId; +import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; +import org.apache.hadoop.mapreduce.v2.api.records.TaskId; +import org.apache.hadoop.mapreduce.v2.api.records.TaskType; +import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerRequestor.ContainerRequest; +import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerReuseRequestor.EventType; +import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +/** + * Tests for RMContainerReuseRequestor. + */ +public class TestRMContainerReuseRequestor { + + private RMContainerReuseRequestor reuseRequestor; + + @Before + public void setup() throws IOException { + reuseRequestor = new RMContainerReuseRequestor(null, + mock(RMContainerAllocator.class)); + } + + @Test + public void testNoOfTimesEachMapTaskContainerCanReuseWithDefaultConfig() { + // Verify that no of times each map task container can be reused with + // default configuration for + // 'yarn.app.mapreduce.am.container.reuse.max-maptasks'. + testNoOfTimesEachContainerCanReuseWithDefaultConfig(TaskType.MAP, + RMContainerAllocator.PRIORITY_MAP); + } + + @Test + public void testNoOfTimesEachMapTaskContainerCanReuseWithConfigLimit() { + // Verify that no of times each map task container can be reused when + // 'yarn.app.mapreduce.am.container.reuse.max-maptasks' configured with a + // value. + Configuration conf = new Configuration(); + conf.setInt(MRJobConfig.MR_AM_CONTAINER_REUSE_MAX_MAPTASKS, 1); + testNoOfTimesEachContainerCanReuseWithConfigLimit(TaskType.MAP, + RMContainerAllocator.PRIORITY_MAP, conf); + } + + @Test + public void testNoOfTimesEachRedTaskContainerCanReuseWithDefaultConfig() { + // Verify that no of times each reduce task container can be reused with + // default configuration for + // 'yarn.app.mapreduce.am.container.reuse.max-reducetasks'. + testNoOfTimesEachContainerCanReuseWithDefaultConfig(TaskType.REDUCE, + RMContainerAllocator.PRIORITY_REDUCE); + } + + @Test + public void testNoOfTimesEachRedTaskContainerCanReuseWithConfigLimit() { + // Verify that no of times each map task container can be reused when + // 'yarn.app.mapreduce.am.container.reuse.max-reducetasks' configured with a + // value. + Configuration conf = new Configuration(); + conf.setInt(MRJobConfig.MR_AM_CONTAINER_REUSE_MAX_REDUCETASKS, 1); + testNoOfTimesEachContainerCanReuseWithConfigLimit(TaskType.REDUCE, + RMContainerAllocator.PRIORITY_REDUCE, conf); + } + + @Test + public void testNoOfMaxMapTaskContainersCanReuseWithDefaultConfig() { + // Verify that no of maximum map containers can be reused at any time with + // default configuration for + // 'yarn.app.mapreduce.am.container.reuse.max-maptaskcontainers'. + testNoOfMaxContainersCanReuseWithDefaultConfig(TaskType.MAP, + RMContainerAllocator.PRIORITY_MAP); + } + + @Test + public void testNoOfMaxMapTaskContainersCanReuseWithConfigLimit() { + // Verify that no of maximum map containers can be reused at any time when + // 'yarn.app.mapreduce.am.container.reuse.max-maptaskcontainers' configured + // with a limit value. + Configuration conf = new Configuration(); + conf.setInt(MRJobConfig.MR_AM_CONTAINER_REUSE_MAX_MAPTASKCONTAINERS, 1); + testNoOfMaxContainersCanReuseWithConfigLimit(TaskType.MAP, + RMContainerAllocator.PRIORITY_MAP, conf); + } + + @Test + public void testNoOfMaxRedTaskContainersCanReuseWithDefaultConfig() { + // Verify that no of maximum reduce containers can be reused at any time + // with default configuration for + // 'yarn.app.mapreduce.am.container.reuse.max-reducetasks'. + testNoOfMaxContainersCanReuseWithDefaultConfig(TaskType.REDUCE, + RMContainerAllocator.PRIORITY_REDUCE); + } + + @Test + public void testNoOfMaxRedTaskContainersCanReuseWithConfigLimit() { + // Verify that no of maximum reduce containers can be reused at any time + // when 'yarn.app.mapreduce.am.container.reuse.max-reducetasks' configured + // with a limit value. + Configuration conf = new Configuration(); + conf.setInt(MRJobConfig.MR_AM_CONTAINER_REUSE_MAX_REDUCETASKCONTAINERS, 1); + testNoOfMaxContainersCanReuseWithConfigLimit(TaskType.REDUCE, + RMContainerAllocator.PRIORITY_REDUCE, conf); + } + + @Test + public void testContainerFailedOnHost() throws Exception { + reuseRequestor.serviceInit(new Configuration()); + Map containersToReuse = reuseRequestor + .getContainersToReuse(); + containersToReuse + .put(newContainerInstance("container_1472171035081_0009_01_000008", + RMContainerAllocator.PRIORITY_REDUCE), "node1"); + containersToReuse + .put(newContainerInstance("container_1472171035081_0009_01_000009", + RMContainerAllocator.PRIORITY_REDUCE), "node2"); + reuseRequestor.getBlacklistedNodes().add("node1"); + // It removes all containers from containersToReuse running in node1 + reuseRequestor.containerFailedOnHost("node1"); + Assert.assertFalse("node1 should not present in reuse containers.", + containersToReuse.containsValue("node1")); + // There will not any change to containersToReuse when there are no + // containers to reuse in that node + reuseRequestor.containerFailedOnHost("node3"); + Assert.assertEquals(1, containersToReuse.size()); + } + + private void testNoOfTimesEachContainerCanReuseWithDefaultConfig( + TaskType taskType, Priority priority) { + // Verify that no of times each container can be reused + + // Add 10 container reqs to the requestor + addContainerReqs(priority); + Container container = newContainerInstance( + "container_123456789_0001_01_000002", priority); + for (int i = 0; i < 10; i++) { + JobId jobId = MRBuilderUtils.newJobId(123456789, 1, 1); + TaskId taskId = MRBuilderUtils.newTaskId(jobId, i + 1, taskType); + TaskAttemptId taskAttemptId = MRBuilderUtils.newTaskAttemptId(taskId, 1); + ContainerAvailableEvent event = new ContainerAvailableEvent( + EventType.CONTAINER_AVAILABLE, taskAttemptId, container); + reuseRequestor.handle(event); + Map containersToReuse = reuseRequestor + .getContainersToReuse(); + Assert.assertTrue("Container should be added for reuse.", + containersToReuse.containsKey(container)); + } + } + + private void testNoOfTimesEachContainerCanReuseWithConfigLimit( + TaskType taskType, Priority priority, Configuration conf) { + reuseRequestor.init(conf); + // Add a container request + ContainerRequest req1 = new ContainerRequest(null, + Resource.newInstance(2048, 1), new String[0], new String[0], priority, + null); + reuseRequestor.addContainerReq(req1); + // Add an another container request + ContainerRequest req2 = new ContainerRequest(null, + Resource.newInstance(2048, 1), new String[0], new String[0], priority, + null); + reuseRequestor.addContainerReq(req2); + + EventType eventType = EventType.CONTAINER_AVAILABLE; + Container container = newContainerInstance( + "container_123456789_0001_01_000002", priority); + JobId jobId = MRBuilderUtils.newJobId(123456789, 1, 1); + TaskId taskId1 = MRBuilderUtils.newTaskId(jobId, 1, taskType); + TaskAttemptId taskAttemptId1 = MRBuilderUtils.newTaskAttemptId(taskId1, 1); + + TaskId taskId2 = MRBuilderUtils.newTaskId(jobId, 2, taskType); + TaskAttemptId taskAttemptId2 = MRBuilderUtils.newTaskAttemptId(taskId2, 1); + + ContainerAvailableEvent event1 = new ContainerAvailableEvent(eventType, + taskAttemptId1, container); + reuseRequestor.handle(event1); + Map containersToReuse = reuseRequestor + .getContainersToReuse(); + // It is reusing the container + Assert.assertTrue("Container should be added for reuse.", + containersToReuse.containsKey(container)); + containersToReuse.clear(); + ContainerAvailableEvent event2 = new ContainerAvailableEvent(eventType, + taskAttemptId2, container); + reuseRequestor.handle(event2); + // It should not be reused since it has already reused and limit value is 1. + Assert.assertFalse("Container should not be added for reuse.", + containersToReuse.containsKey(container)); + } + + private void testNoOfMaxContainersCanReuseWithDefaultConfig(TaskType taskType, + Priority priority) { + // It tests no of times each container can be reused + + // Add 10 container reqs to the requestor + addContainerReqs(priority); + for (int i = 0; i < 10; i++) { + Container container = newContainerInstance( + "container_123456789_0001_01_00000" + (i + 2), priority); + JobId jobId = MRBuilderUtils.newJobId(123456789, 1, 1); + TaskId taskId1 = MRBuilderUtils.newTaskId(jobId, i + 1, taskType); + TaskAttemptId taskAttemptId1 = MRBuilderUtils.newTaskAttemptId(taskId1, + 1); + ContainerAvailableEvent event1 = new ContainerAvailableEvent( + EventType.CONTAINER_AVAILABLE, taskAttemptId1, container); + reuseRequestor.handle(event1); + Map containersToReuse = reuseRequestor + .getContainersToReuse(); + Assert.assertTrue("Container should be added for reuse.", + containersToReuse.containsKey(container)); + } + } + + private void testNoOfMaxContainersCanReuseWithConfigLimit(TaskType taskType, + Priority priority, Configuration conf) { + reuseRequestor.init(conf); + ContainerRequest req1 = new ContainerRequest(null, + Resource.newInstance(2048, 1), new String[0], new String[0], priority, + null); + reuseRequestor.addContainerReq(req1); + + ContainerRequest req2 = new ContainerRequest(null, + Resource.newInstance(2048, 1), new String[0], new String[0], priority, + null); + reuseRequestor.addContainerReq(req2); + + EventType eventType = EventType.CONTAINER_AVAILABLE; + Container container1 = newContainerInstance( + "container_123456789_0001_01_000002", priority); + JobId jobId = MRBuilderUtils.newJobId(123456789, 1, 1); + TaskId taskId1 = MRBuilderUtils.newTaskId(jobId, 1, taskType); + TaskAttemptId taskAttemptId1 = MRBuilderUtils.newTaskAttemptId(taskId1, 1); + + TaskId taskId2 = MRBuilderUtils.newTaskId(jobId, 2, taskType); + TaskAttemptId taskAttemptId2 = MRBuilderUtils.newTaskAttemptId(taskId2, 1); + + ContainerAvailableEvent event1 = new ContainerAvailableEvent(eventType, + taskAttemptId1, container1); + reuseRequestor.handle(event1); + Map containersToReuse = reuseRequestor + .getContainersToReuse(); + Assert.assertTrue("Container should be added for reuse.", + containersToReuse.containsKey(container1)); + containersToReuse.clear(); + Container container2 = newContainerInstance( + "container_123456789_0001_01_000003", priority); + ContainerAvailableEvent event2 = new ContainerAvailableEvent(eventType, + taskAttemptId2, container2); + reuseRequestor.handle(event2); + Assert.assertFalse("Container should not be added for reuse.", + containersToReuse.containsKey(container2)); + } + + private void addContainerReqs(Priority priority) { + Configuration conf = new Configuration(); + reuseRequestor.init(conf); + for (int i = 0; i < 10; i++) { + ContainerRequest req = new ContainerRequest(null, + Resource.newInstance(2048, 1), new String[0], new String[0], priority, + null); + reuseRequestor.addContainerReq(req); + } + } + + private Container newContainerInstance(String containerId, + Priority priority) { + return Container.newInstance(ContainerId.fromString(containerId), + NodeId.newInstance("node1", 8080), "", null, priority, null); + } + + @After + public void tearDown() { + reuseRequestor.stop(); + } +}