From a73e1184379e5f2dca62c48b763cdcb54829d0bb Mon Sep 17 00:00:00 2001 From: Wangda Tan Date: Fri, 5 Aug 2016 10:43:35 -0700 Subject: [PATCH] YARN-4888. Changes in scheduler to identify resource-requests explicitly by allocation-id. (Subru Krishnan via wangda) (cherry picked from commit 3f100d76ff5df020dbb8ecd4f5b4f9736a0a8270) --- .../src/main/proto/yarn_protos.proto | 2 +- .../yarn/server/utils/BuilderUtils.java | 14 +- .../OpportunisticContainerAllocator.java | 3 +- .../scheduler/SchedulerRequestKey.java | 41 ++- .../allocator/RegularContainerAllocator.java | 15 +- .../scheduler/fair/FSAppAttempt.java | 11 +- .../scheduler/fifo/FifoScheduler.java | 7 +- .../yarn/server/resourcemanager/MockAM.java | 25 +- .../scheduler/TestAppSchedulingInfo.java | 37 ++- ...TestSchedulingWithAllocationRequestId.java | 274 ++++++++++++++++++ .../scheduler/capacity/TestUtils.java | 8 +- 11 files changed, 399 insertions(+), 38 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulingWithAllocationRequestId.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto index c3d121a14d2..b5a7077f3ba 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto @@ -308,7 +308,7 @@ message ResourceRequestProto { optional bool relax_locality = 5 [default = true]; optional string node_label_expression = 6; optional ExecutionTypeRequestProto execution_type_request = 7; - optional int64 allocation_request_id = 8 [default = -1]; + optional int64 allocation_request_id = 8 [default = 0]; } message ExecutionTypeRequestProto { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java index 5aa1c417f31..8ecbea7d449 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java @@ -236,7 +236,8 @@ public class BuilderUtils { public static Container newContainer(ContainerId containerId, NodeId nodeId, String nodeHttpAddress, Resource resource, Priority priority, - Token containerToken, ExecutionType executionType) { + Token containerToken, ExecutionType executionType, + long allocationRequestId) { Container container = recordFactory.newRecordInstance(Container.class); container.setId(containerId); container.setNodeId(nodeId); @@ -245,6 +246,7 @@ public class BuilderUtils { container.setPriority(priority); container.setContainerToken(containerToken); container.setExecutionType(executionType); + container.setAllocationRequestId(allocationRequestId); return container; } @@ -252,7 +254,15 @@ public class BuilderUtils { String nodeHttpAddress, Resource resource, Priority priority, Token containerToken) { return newContainer(containerId, nodeId, nodeHttpAddress, resource, - priority, containerToken, ExecutionType.GUARANTEED); + priority, containerToken, ExecutionType.GUARANTEED, 0); + } + + public static Container newContainer(ContainerId containerId, NodeId nodeId, + String nodeHttpAddress, Resource resource, Priority priority, + Token containerToken, long allocationRequestId) { + return newContainer(containerId, nodeId, nodeHttpAddress, resource, + priority, containerToken, ExecutionType.GUARANTEED, + allocationRequestId); } public static T newToken(Class tokenClass, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/OpportunisticContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/OpportunisticContainerAllocator.java index ce5bda080d7..4723233ab1b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/OpportunisticContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/OpportunisticContainerAllocator.java @@ -163,7 +163,8 @@ public class OpportunisticContainerAllocator { Container container = BuilderUtils.newContainer( cId, nodeId, nodeId.getHost() + ":" + webpagePort, capability, rr.getPriority(), containerToken, - containerTokenIdentifier.getExecutionType()); + containerTokenIdentifier.getExecutionType(), + rr.getAllocationRequestId()); return container; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerRequestKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerRequestKey.java index b4988be9a80..4b640aeab31 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerRequestKey.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerRequestKey.java @@ -30,9 +30,7 @@ public final class SchedulerRequestKey implements Comparable { private final Priority priority; - - public static final SchedulerRequestKey UNDEFINED = - new SchedulerRequestKey(Priority.UNDEFINED); + private final long allocationRequestId; /** * Factory method to generate a SchedulerRequestKey from a ResourceRequest. @@ -40,7 +38,8 @@ public final class SchedulerRequestKey implements * @return SchedulerRequestKey */ public static SchedulerRequestKey create(ResourceRequest req) { - return new SchedulerRequestKey(req.getPriority()); + return new SchedulerRequestKey(req.getPriority(), + req.getAllocationRequestId()); } /** @@ -50,11 +49,13 @@ public final class SchedulerRequestKey implements * @return SchedulerRequestKey */ public static SchedulerRequestKey extractFrom(Container container) { - return new SchedulerRequestKey(container.getPriority()); + return new SchedulerRequestKey(container.getPriority(), + container.getAllocationRequestId()); } - private SchedulerRequestKey(Priority priority) { + private SchedulerRequestKey(Priority priority, long allocationRequestId) { this.priority = priority; + this.allocationRequestId = allocationRequestId; } /** @@ -66,6 +67,15 @@ public final class SchedulerRequestKey implements return priority; } + /** + * Get the Id of the associated {@link ResourceRequest}. + * + * @return the Id of the associated {@link ResourceRequest} + */ + public long getAllocationRequestId() { + return allocationRequestId; + } + @Override public int compareTo(SchedulerRequestKey o) { if (o == null) { @@ -75,7 +85,12 @@ public final class SchedulerRequestKey implements return 1; } } - return o.getPriority().compareTo(priority); + int priorityCompare = o.getPriority().compareTo(priority); + // we first sort by priority and then by allocationRequestId + if (priorityCompare != 0) { + return priorityCompare; + } + return Long.compare(allocationRequestId, o.getAllocationRequestId()); } @Override @@ -88,12 +103,20 @@ public final class SchedulerRequestKey implements } SchedulerRequestKey that = (SchedulerRequestKey) o; - return getPriority().equals(that.getPriority()); + if (getAllocationRequestId() != that.getAllocationRequestId()) { + return false; + } + return getPriority() != null ? + getPriority().equals(that.getPriority()) : + that.getPriority() == null; } @Override public int hashCode() { - return getPriority().hashCode(); + int result = getPriority() != null ? getPriority().hashCode() : 0; + result = 31 * result + (int) (getAllocationRequestId() ^ ( + getAllocationRequestId() >>> 32)); + return result; } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java index 29b37d8e362..111a1fef463 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java @@ -18,6 +18,9 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator; +import java.util.ArrayList; +import java.util.List; + import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -32,10 +35,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils; - import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey; - - import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAMContainerLaunchDiagnosticsConstants; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssignment; @@ -46,9 +46,6 @@ import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; -import java.util.ArrayList; -import java.util.List; - /** * Allocate normal (new) containers, considers locality/label, etc. Using * delayed scheduling mechanism to get better locality allocation. @@ -611,8 +608,10 @@ public class RegularContainerAllocator extends AbstractContainerAllocator { application.getNewContainerId()); // Create the container - return BuilderUtils.newContainer(containerId, nodeId, node.getRMNode() - .getHttpAddress(), capability, schedulerKey.getPriority(), null); + return BuilderUtils.newContainer(containerId, nodeId, + node.getRMNode().getHttpAddress(), capability, + schedulerKey.getPriority(), null, + schedulerKey.getAllocationRequestId()); } private ContainerAllocation handleNewContainerAllocation( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java index 8f074cdc42a..9e5a807f9d3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java @@ -20,8 +20,8 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; import java.io.Serializable; import java.text.DecimalFormat; -import java.util.Arrays; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Comparator; import java.util.HashMap; @@ -55,8 +55,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManage import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; @@ -495,9 +495,10 @@ public class FSAppAttempt extends SchedulerApplicationAttempt getApplicationAttemptId(), getNewContainerId()); // Create the container - Container container = - BuilderUtils.newContainer(containerId, nodeId, node.getRMNode() - .getHttpAddress(), capability, schedulerKey.getPriority(), null); + Container container = BuilderUtils.newContainer(containerId, nodeId, + node.getRMNode().getHttpAddress(), capability, + schedulerKey.getPriority(), null, + schedulerKey.getAllocationRequestId()); return container; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java index fe8d0afaae8..2863a97c871 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java @@ -710,9 +710,10 @@ public class FifoScheduler extends .getApplicationAttemptId(), application.getNewContainerId()); // Create the container - Container container = - BuilderUtils.newContainer(containerId, nodeId, node.getRMNode() - .getHttpAddress(), capability, schedulerKey.getPriority(), null); + Container container = BuilderUtils.newContainer(containerId, nodeId, + node.getRMNode().getHttpAddress(), capability, + schedulerKey.getPriority(), null, + schedulerKey.getAllocationRequestId()); // Allocate! diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java index 8f6a6c1a660..1b11472cbb2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java @@ -128,7 +128,13 @@ public class MockAM { public void addRequests(String[] hosts, int memory, int priority, int containers) throws Exception { - requests.addAll(createReq(hosts, memory, priority, containers)); + addRequests(hosts, memory, priority, containers, 0L); + } + + public void addRequests(String[] hosts, int memory, int priority, + int containers, long allocationRequestId) throws Exception { + requests.addAll( + createReq(hosts, memory, priority, containers, allocationRequestId)); } public AllocateResponse schedule() throws Exception { @@ -159,17 +165,19 @@ public class MockAM { List releases, String labelExpression) throws Exception { List reqs = createReq(new String[] { host }, memory, priority, numContainers, - labelExpression); + labelExpression, 0L); return allocate(reqs, releases); } - public List createReq(String[] hosts, int memory, int priority, - int containers) throws Exception { - return createReq(hosts, memory, priority, containers, null); + public List createReq(String[] hosts, int memory, + int priority, int containers, long allocationRequestId) throws Exception { + return createReq(hosts, memory, priority, containers, null, + allocationRequestId); } - public List createReq(String[] hosts, int memory, int priority, - int containers, String labelExpression) throws Exception { + public List createReq(String[] hosts, int memory, + int priority, int containers, String labelExpression, + long allocationRequestId) throws Exception { List reqs = new ArrayList(); if (hosts != null) { for (String host : hosts) { @@ -178,10 +186,12 @@ public class MockAM { ResourceRequest hostReq = createResourceReq(host, memory, priority, containers, labelExpression); + hostReq.setAllocationRequestId(allocationRequestId); reqs.add(hostReq); ResourceRequest rackReq = createResourceReq("/default-rack", memory, priority, containers, labelExpression); + rackReq.setAllocationRequestId(allocationRequestId); reqs.add(rackReq); } } @@ -189,6 +199,7 @@ public class MockAM { ResourceRequest offRackReq = createResourceReq(ResourceRequest.ANY, memory, priority, containers, labelExpression); + offRackReq.setAllocationRequestId(allocationRequestId); reqs.add(offRackReq); return reqs; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java index a1c6294e222..503ea34b06b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java @@ -18,12 +18,17 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler; -import static org.mockito.Mockito.mock; import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; + import java.util.ArrayList; +import java.util.Iterator; +import java.util.TreeSet; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSLeafQueue; import org.junit.Assert; import org.junit.Test; @@ -70,4 +75,34 @@ public class TestAppSchedulingInfo { blacklistRemovals); Assert.assertFalse(appSchedulingInfo.getAndResetBlacklistChanged()); } + + @Test + public void testSchedulerRequestKeyOrdering() { + TreeSet ts = new TreeSet<>(); + ts.add(TestUtils.toSchedulerKey(Priority.newInstance(1), 1)); + ts.add(TestUtils.toSchedulerKey(Priority.newInstance(1), 2)); + ts.add(TestUtils.toSchedulerKey(Priority.newInstance(0), 4)); + ts.add(TestUtils.toSchedulerKey(Priority.newInstance(0), 3)); + ts.add(TestUtils.toSchedulerKey(Priority.newInstance(2), 5)); + ts.add(TestUtils.toSchedulerKey(Priority.newInstance(2), 6)); + Iterator iter = ts.iterator(); + SchedulerRequestKey sk = iter.next(); + Assert.assertEquals(0, sk.getPriority().getPriority()); + Assert.assertEquals(3, sk.getAllocationRequestId()); + sk = iter.next(); + Assert.assertEquals(0, sk.getPriority().getPriority()); + Assert.assertEquals(4, sk.getAllocationRequestId()); + sk = iter.next(); + Assert.assertEquals(1, sk.getPriority().getPriority()); + Assert.assertEquals(1, sk.getAllocationRequestId()); + sk = iter.next(); + Assert.assertEquals(1, sk.getPriority().getPriority()); + Assert.assertEquals(2, sk.getAllocationRequestId()); + sk = iter.next(); + Assert.assertEquals(2, sk.getPriority().getPriority()); + Assert.assertEquals(5, sk.getAllocationRequestId()); + sk = iter.next(); + Assert.assertEquals(2, sk.getPriority().getPriority()); + Assert.assertEquals(6, sk.getAllocationRequestId()); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulingWithAllocationRequestId.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulingWithAllocationRequestId.java new file mode 100644 index 00000000000..e60fd6f889a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulingWithAllocationRequestId.java @@ -0,0 +1,274 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler; + +import java.util.List; + +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.MockAM; +import org.apache.hadoop.yarn.server.resourcemanager.MockNM; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.ParameterizedSchedulerTestBase; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Tests for checking Scheduling with allocationRequestId, i.e. mapping of + * allocated containers to the original client {@code ResourceRequest}. + */ +public class TestSchedulingWithAllocationRequestId + extends ParameterizedSchedulerTestBase { + + private static final Logger LOG = + LoggerFactory.getLogger(TestSchedulingWithAllocationRequestId.class); + private static final int GB = 1024; + + @Test + public void testMultipleAllocationRequestIds() throws Exception { + configureScheduler(); + YarnConfiguration conf = getConf(); + MockRM rm = new MockRM(conf); + try { + rm.start(); + + MockNM nm1 = rm.registerNode("127.0.0.1:1234", 4 * GB); + MockNM nm2 = rm.registerNode("127.0.0.2:5678", 4 * GB); + RMApp app1 = rm.submitApp(2048); + // kick the scheduling + nm1.nodeHeartbeat(true); + RMAppAttempt attempt1 = app1.getCurrentAppAttempt(); + MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId()); + am1.registerAppAttempt(); + + // add request for containers with id 10 & 20 + am1.addRequests(new String[] {"127.0.0.1" }, 2 * GB, 1, 1, 10L); + AllocateResponse allocResponse = am1.schedule(); // send the request + am1.addRequests(new String[] {"127.0.0.2" }, 2 * GB, 1, 2, 20L); + allocResponse = am1.schedule(); // send the request + + // check if request id 10 is satisfied + nm1.nodeHeartbeat(true); + allocResponse = am1.schedule(); // send the request + while (allocResponse.getAllocatedContainers().size() < 1) { + LOG.info("Waiting for containers to be created for app 1..."); + Thread.sleep(100); + allocResponse = am1.schedule(); + } + List allocated = allocResponse.getAllocatedContainers(); + Assert.assertEquals(1, allocated.size()); + checkAllocatedContainer(allocated.get(0), 2 * GB, nm1.getNodeId(), 10); + + // check now if request id 20 is satisfied + nm2.nodeHeartbeat(true); + while (allocResponse.getAllocatedContainers().size() < 2) { + LOG.info("Waiting for containers to be created for app 1..."); + Thread.sleep(100); + allocResponse = am1.schedule(); + } + + allocated = allocResponse.getAllocatedContainers(); + Assert.assertEquals(2, allocated.size()); + for (Container container : allocated) { + checkAllocatedContainer(container, 2 * GB, nm2.getNodeId(), 20); + } + } finally { + if (rm != null) { + rm.stop(); + } + } + } + + @Test + public void testMultipleAllocationRequestDiffPriority() throws Exception { + configureScheduler(); + YarnConfiguration conf = getConf(); + MockRM rm = new MockRM(conf); + try { + rm.start(); + + MockNM nm1 = rm.registerNode("127.0.0.1:1234", 4 * GB); + MockNM nm2 = rm.registerNode("127.0.0.2:5678", 4 * GB); + RMApp app1 = rm.submitApp(2048); + // kick the scheduling + nm1.nodeHeartbeat(true); + RMAppAttempt attempt1 = app1.getCurrentAppAttempt(); + MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId()); + am1.registerAppAttempt(); + + // add request for containers with id 10 & 20 + am1.addRequests(new String[] {"127.0.0.1" }, 2 * GB, 2, 1, 10L); + AllocateResponse allocResponse = am1.schedule(); // send the request + am1.addRequests(new String[] {"127.0.0.2" }, 2 * GB, 1, 2, 20L); + allocResponse = am1.schedule(); // send the request + + // check if request id 20 is satisfied first + nm2.nodeHeartbeat(true); + while (allocResponse.getAllocatedContainers().size() < 2) { + LOG.info("Waiting for containers to be created for app 1..."); + Thread.sleep(100); + allocResponse = am1.schedule(); + } + + List allocated = allocResponse.getAllocatedContainers(); + Assert.assertEquals(2, allocated.size()); + for (Container container : allocated) { + checkAllocatedContainer(container, 2 * GB, nm2.getNodeId(), 20); + } + + // check now if request id 10 is satisfied + nm1.nodeHeartbeat(true); + allocResponse = am1.schedule(); // send the request + while (allocResponse.getAllocatedContainers().size() < 1) { + LOG.info("Waiting for containers to be created for app 1..."); + Thread.sleep(100); + allocResponse = am1.schedule(); + } + allocated = allocResponse.getAllocatedContainers(); + Assert.assertEquals(1, allocated.size()); + checkAllocatedContainer(allocated.get(0), 2 * GB, nm1.getNodeId(), 10); + } finally { + if (rm != null) { + rm.stop(); + } + } + } + + private void checkAllocatedContainer(Container allocated, int memory, + NodeId nodeId, long allocationRequestId) { + Assert.assertEquals(memory, allocated.getResource().getMemorySize()); + Assert.assertEquals(nodeId, allocated.getNodeId()); + Assert.assertEquals(allocationRequestId, + allocated.getAllocationRequestId()); + } + + @Test + public void testMultipleAppsWithAllocationReqId() throws Exception { + configureScheduler(); + YarnConfiguration conf = getConf(); + MockRM rm = new MockRM(conf); + try { + rm.start(); + + // Register node1 + String host0 = "host_0"; + String host1 = "host_1"; + MockNM nm1 = + new MockNM(host0 + ":1234", 8 * GB, rm.getResourceTrackerService()); + nm1.registerNode(); + + // Register node2 + MockNM nm2 = + new MockNM(host1 + ":2351", 8 * GB, rm.getResourceTrackerService()); + nm2.registerNode(); + + // submit 1st app + RMApp app1 = rm.submitApp(1 * GB, "user_0", "a1"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1); + + // Submit app1 RR with allocationReqId = 5 + int numContainers = 1; + am1.addRequests(new String[] {host0, host1 }, 1 * GB, 1, numContainers, + 5L); + AllocateResponse allocResponse = am1.schedule(); + + // wait for containers to be allocated. + nm1.nodeHeartbeat(true); + allocResponse = am1.schedule(); // send the request + while (allocResponse.getAllocatedContainers().size() < 1) { + LOG.info("Waiting for containers to be created for app 1..."); + Thread.sleep(100); + allocResponse = am1.schedule(); + } + + List allocated = allocResponse.getAllocatedContainers(); + Assert.assertEquals(1, allocated.size()); + checkAllocatedContainer(allocated.get(0), 1 * GB, nm1.getNodeId(), 5L); + + // Submit another application + RMApp app2 = rm.submitApp(1 * GB, "user_1", "a2"); + MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm2); + + // Submit app2 RR with allocationReqId = 5 + am2.addRequests(new String[] {host0, host1 }, 2 * GB, 1, numContainers, + 5L); + am2.schedule(); + + // wait for containers to be allocated. + nm2.nodeHeartbeat(true); + allocResponse = am2.schedule(); // send the request + while (allocResponse.getAllocatedContainers().size() < 1) { + LOG.info("Waiting for containers to be created for app 1..."); + Thread.sleep(100); + allocResponse = am2.schedule(); + } + + allocated = allocResponse.getAllocatedContainers(); + Assert.assertEquals(1, allocated.size()); + checkAllocatedContainer(allocated.get(0), 2 * GB, nm2.getNodeId(), 5L); + + // Now submit app2 RR with allocationReqId = 10 + am2.addRequests(new String[] {host0, host1 }, 3 * GB, 1, numContainers, + 10L); + am2.schedule(); + + // wait for containers to be allocated. + nm1.nodeHeartbeat(true); + allocResponse = am2.schedule(); // send the request + while (allocResponse.getAllocatedContainers().size() < 1) { + LOG.info("Waiting for containers to be created for app 1..."); + Thread.sleep(100); + allocResponse = am2.schedule(); + } + + allocated = allocResponse.getAllocatedContainers(); + Assert.assertEquals(1, allocated.size()); + checkAllocatedContainer(allocated.get(0), 3 * GB, nm1.getNodeId(), 10L); + + // Now submit app1 RR with allocationReqId = 10 + am1.addRequests(new String[] {host0, host1 }, 4 * GB, 1, numContainers, + 10L); + am1.schedule(); + + // wait for containers to be allocated. + nm2.nodeHeartbeat(true); + allocResponse = am1.schedule(); // send the request + while (allocResponse.getAllocatedContainers().size() < 1) { + LOG.info("Waiting for containers to be created for app 1..."); + Thread.sleep(100); + allocResponse = am1.schedule(); + } + + allocated = allocResponse.getAllocatedContainers(); + Assert.assertEquals(1, allocated.size()); + checkAllocatedContainer(allocated.get(0), 4 * GB, nm2.getNodeId(), 10L); + } finally { + if (rm != null) { + rm.stop(); + } + } + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java index c808b5a260c..66e833f1798 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java @@ -51,7 +51,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsMana import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; - import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; @@ -412,4 +411,11 @@ public class TestUtils { return SchedulerRequestKey.create(ResourceRequest.newInstance( Priority.newInstance(pri), null, null, 0)); } + + public static SchedulerRequestKey toSchedulerKey(Priority pri, + long allocationRequestId) { + ResourceRequest req = ResourceRequest.newInstance(pri, null, null, 0); + req.setAllocationRequestId(allocationRequestId); + return SchedulerRequestKey.create(req); + } }