YARN-4888. Changes in scheduler to identify resource-requests explicitly by allocation-id. (Subru Krishnan via wangda)
(cherry picked from commit 3f100d76ff
)
This commit is contained in:
parent
69da77c357
commit
a73e118437
|
@ -308,7 +308,7 @@ message ResourceRequestProto {
|
|||
optional bool relax_locality = 5 [default = true];
|
||||
optional string node_label_expression = 6;
|
||||
optional ExecutionTypeRequestProto execution_type_request = 7;
|
||||
optional int64 allocation_request_id = 8 [default = -1];
|
||||
optional int64 allocation_request_id = 8 [default = 0];
|
||||
}
|
||||
|
||||
message ExecutionTypeRequestProto {
|
||||
|
|
|
@ -236,7 +236,8 @@ public class BuilderUtils {
|
|||
|
||||
public static Container newContainer(ContainerId containerId, NodeId nodeId,
|
||||
String nodeHttpAddress, Resource resource, Priority priority,
|
||||
Token containerToken, ExecutionType executionType) {
|
||||
Token containerToken, ExecutionType executionType,
|
||||
long allocationRequestId) {
|
||||
Container container = recordFactory.newRecordInstance(Container.class);
|
||||
container.setId(containerId);
|
||||
container.setNodeId(nodeId);
|
||||
|
@ -245,6 +246,7 @@ public class BuilderUtils {
|
|||
container.setPriority(priority);
|
||||
container.setContainerToken(containerToken);
|
||||
container.setExecutionType(executionType);
|
||||
container.setAllocationRequestId(allocationRequestId);
|
||||
return container;
|
||||
}
|
||||
|
||||
|
@ -252,7 +254,15 @@ public class BuilderUtils {
|
|||
String nodeHttpAddress, Resource resource, Priority priority,
|
||||
Token containerToken) {
|
||||
return newContainer(containerId, nodeId, nodeHttpAddress, resource,
|
||||
priority, containerToken, ExecutionType.GUARANTEED);
|
||||
priority, containerToken, ExecutionType.GUARANTEED, 0);
|
||||
}
|
||||
|
||||
public static Container newContainer(ContainerId containerId, NodeId nodeId,
|
||||
String nodeHttpAddress, Resource resource, Priority priority,
|
||||
Token containerToken, long allocationRequestId) {
|
||||
return newContainer(containerId, nodeId, nodeHttpAddress, resource,
|
||||
priority, containerToken, ExecutionType.GUARANTEED,
|
||||
allocationRequestId);
|
||||
}
|
||||
|
||||
public static <T extends Token> T newToken(Class<T> tokenClass,
|
||||
|
|
|
@ -163,7 +163,8 @@ public class OpportunisticContainerAllocator {
|
|||
Container container = BuilderUtils.newContainer(
|
||||
cId, nodeId, nodeId.getHost() + ":" + webpagePort,
|
||||
capability, rr.getPriority(), containerToken,
|
||||
containerTokenIdentifier.getExecutionType());
|
||||
containerTokenIdentifier.getExecutionType(),
|
||||
rr.getAllocationRequestId());
|
||||
return container;
|
||||
}
|
||||
|
||||
|
|
|
@ -30,9 +30,7 @@ public final class SchedulerRequestKey implements
|
|||
Comparable<SchedulerRequestKey> {
|
||||
|
||||
private final Priority priority;
|
||||
|
||||
public static final SchedulerRequestKey UNDEFINED =
|
||||
new SchedulerRequestKey(Priority.UNDEFINED);
|
||||
private final long allocationRequestId;
|
||||
|
||||
/**
|
||||
* Factory method to generate a SchedulerRequestKey from a ResourceRequest.
|
||||
|
@ -40,7 +38,8 @@ public final class SchedulerRequestKey implements
|
|||
* @return SchedulerRequestKey
|
||||
*/
|
||||
public static SchedulerRequestKey create(ResourceRequest req) {
|
||||
return new SchedulerRequestKey(req.getPriority());
|
||||
return new SchedulerRequestKey(req.getPriority(),
|
||||
req.getAllocationRequestId());
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -50,11 +49,13 @@ public final class SchedulerRequestKey implements
|
|||
* @return SchedulerRequestKey
|
||||
*/
|
||||
public static SchedulerRequestKey extractFrom(Container container) {
|
||||
return new SchedulerRequestKey(container.getPriority());
|
||||
return new SchedulerRequestKey(container.getPriority(),
|
||||
container.getAllocationRequestId());
|
||||
}
|
||||
|
||||
private SchedulerRequestKey(Priority priority) {
|
||||
private SchedulerRequestKey(Priority priority, long allocationRequestId) {
|
||||
this.priority = priority;
|
||||
this.allocationRequestId = allocationRequestId;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -66,6 +67,15 @@ public final class SchedulerRequestKey implements
|
|||
return priority;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the Id of the associated {@link ResourceRequest}.
|
||||
*
|
||||
* @return the Id of the associated {@link ResourceRequest}
|
||||
*/
|
||||
public long getAllocationRequestId() {
|
||||
return allocationRequestId;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int compareTo(SchedulerRequestKey o) {
|
||||
if (o == null) {
|
||||
|
@ -75,7 +85,12 @@ public final class SchedulerRequestKey implements
|
|||
return 1;
|
||||
}
|
||||
}
|
||||
return o.getPriority().compareTo(priority);
|
||||
int priorityCompare = o.getPriority().compareTo(priority);
|
||||
// we first sort by priority and then by allocationRequestId
|
||||
if (priorityCompare != 0) {
|
||||
return priorityCompare;
|
||||
}
|
||||
return Long.compare(allocationRequestId, o.getAllocationRequestId());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -88,12 +103,20 @@ public final class SchedulerRequestKey implements
|
|||
}
|
||||
|
||||
SchedulerRequestKey that = (SchedulerRequestKey) o;
|
||||
return getPriority().equals(that.getPriority());
|
||||
|
||||
if (getAllocationRequestId() != that.getAllocationRequestId()) {
|
||||
return false;
|
||||
}
|
||||
return getPriority() != null ?
|
||||
getPriority().equals(that.getPriority()) :
|
||||
that.getPriority() == null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return getPriority().hashCode();
|
||||
int result = getPriority() != null ? getPriority().hashCode() : 0;
|
||||
result = 31 * result + (int) (getAllocationRequestId() ^ (
|
||||
getAllocationRequestId() >>> 32));
|
||||
return result;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,6 +18,9 @@
|
|||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
@ -32,10 +35,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
|
||||
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey;
|
||||
|
||||
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAMContainerLaunchDiagnosticsConstants;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssignment;
|
||||
|
@ -46,9 +46,6 @@ import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
|||
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
|
||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Allocate normal (new) containers, considers locality/label, etc. Using
|
||||
* delayed scheduling mechanism to get better locality allocation.
|
||||
|
@ -611,8 +608,10 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
|
|||
application.getNewContainerId());
|
||||
|
||||
// Create the container
|
||||
return BuilderUtils.newContainer(containerId, nodeId, node.getRMNode()
|
||||
.getHttpAddress(), capability, schedulerKey.getPriority(), null);
|
||||
return BuilderUtils.newContainer(containerId, nodeId,
|
||||
node.getRMNode().getHttpAddress(), capability,
|
||||
schedulerKey.getPriority(), null,
|
||||
schedulerKey.getAllocationRequestId());
|
||||
}
|
||||
|
||||
private ContainerAllocation handleNewContainerAllocation(
|
||||
|
|
|
@ -20,8 +20,8 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
|
|||
|
||||
import java.io.Serializable;
|
||||
import java.text.DecimalFormat;
|
||||
import java.util.Arrays;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Comparator;
|
||||
import java.util.HashMap;
|
||||
|
@ -55,8 +55,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManage
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey;
|
||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
|
||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
|
@ -495,9 +495,10 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
|
|||
getApplicationAttemptId(), getNewContainerId());
|
||||
|
||||
// Create the container
|
||||
Container container =
|
||||
BuilderUtils.newContainer(containerId, nodeId, node.getRMNode()
|
||||
.getHttpAddress(), capability, schedulerKey.getPriority(), null);
|
||||
Container container = BuilderUtils.newContainer(containerId, nodeId,
|
||||
node.getRMNode().getHttpAddress(), capability,
|
||||
schedulerKey.getPriority(), null,
|
||||
schedulerKey.getAllocationRequestId());
|
||||
|
||||
return container;
|
||||
}
|
||||
|
|
|
@ -710,9 +710,10 @@ public class FifoScheduler extends
|
|||
.getApplicationAttemptId(), application.getNewContainerId());
|
||||
|
||||
// Create the container
|
||||
Container container =
|
||||
BuilderUtils.newContainer(containerId, nodeId, node.getRMNode()
|
||||
.getHttpAddress(), capability, schedulerKey.getPriority(), null);
|
||||
Container container = BuilderUtils.newContainer(containerId, nodeId,
|
||||
node.getRMNode().getHttpAddress(), capability,
|
||||
schedulerKey.getPriority(), null,
|
||||
schedulerKey.getAllocationRequestId());
|
||||
|
||||
// Allocate!
|
||||
|
||||
|
|
|
@ -128,7 +128,13 @@ public class MockAM {
|
|||
|
||||
public void addRequests(String[] hosts, int memory, int priority,
|
||||
int containers) throws Exception {
|
||||
requests.addAll(createReq(hosts, memory, priority, containers));
|
||||
addRequests(hosts, memory, priority, containers, 0L);
|
||||
}
|
||||
|
||||
public void addRequests(String[] hosts, int memory, int priority,
|
||||
int containers, long allocationRequestId) throws Exception {
|
||||
requests.addAll(
|
||||
createReq(hosts, memory, priority, containers, allocationRequestId));
|
||||
}
|
||||
|
||||
public AllocateResponse schedule() throws Exception {
|
||||
|
@ -159,17 +165,19 @@ public class MockAM {
|
|||
List<ContainerId> releases, String labelExpression) throws Exception {
|
||||
List<ResourceRequest> reqs =
|
||||
createReq(new String[] { host }, memory, priority, numContainers,
|
||||
labelExpression);
|
||||
labelExpression, 0L);
|
||||
return allocate(reqs, releases);
|
||||
}
|
||||
|
||||
public List<ResourceRequest> createReq(String[] hosts, int memory, int priority,
|
||||
int containers) throws Exception {
|
||||
return createReq(hosts, memory, priority, containers, null);
|
||||
public List<ResourceRequest> createReq(String[] hosts, int memory,
|
||||
int priority, int containers, long allocationRequestId) throws Exception {
|
||||
return createReq(hosts, memory, priority, containers, null,
|
||||
allocationRequestId);
|
||||
}
|
||||
|
||||
public List<ResourceRequest> createReq(String[] hosts, int memory, int priority,
|
||||
int containers, String labelExpression) throws Exception {
|
||||
public List<ResourceRequest> createReq(String[] hosts, int memory,
|
||||
int priority, int containers, String labelExpression,
|
||||
long allocationRequestId) throws Exception {
|
||||
List<ResourceRequest> reqs = new ArrayList<ResourceRequest>();
|
||||
if (hosts != null) {
|
||||
for (String host : hosts) {
|
||||
|
@ -178,10 +186,12 @@ public class MockAM {
|
|||
ResourceRequest hostReq =
|
||||
createResourceReq(host, memory, priority, containers,
|
||||
labelExpression);
|
||||
hostReq.setAllocationRequestId(allocationRequestId);
|
||||
reqs.add(hostReq);
|
||||
ResourceRequest rackReq =
|
||||
createResourceReq("/default-rack", memory, priority, containers,
|
||||
labelExpression);
|
||||
rackReq.setAllocationRequestId(allocationRequestId);
|
||||
reqs.add(rackReq);
|
||||
}
|
||||
}
|
||||
|
@ -189,6 +199,7 @@ public class MockAM {
|
|||
|
||||
ResourceRequest offRackReq = createResourceReq(ResourceRequest.ANY, memory,
|
||||
priority, containers, labelExpression);
|
||||
offRackReq.setAllocationRequestId(allocationRequestId);
|
||||
reqs.add(offRackReq);
|
||||
return reqs;
|
||||
}
|
||||
|
|
|
@ -18,12 +18,17 @@
|
|||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
|
||||
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.doReturn;
|
||||
import static org.mockito.Mockito.mock;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Iterator;
|
||||
import java.util.TreeSet;
|
||||
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.Priority;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSLeafQueue;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
@ -70,4 +75,34 @@ public class TestAppSchedulingInfo {
|
|||
blacklistRemovals);
|
||||
Assert.assertFalse(appSchedulingInfo.getAndResetBlacklistChanged());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSchedulerRequestKeyOrdering() {
|
||||
TreeSet<SchedulerRequestKey> ts = new TreeSet<>();
|
||||
ts.add(TestUtils.toSchedulerKey(Priority.newInstance(1), 1));
|
||||
ts.add(TestUtils.toSchedulerKey(Priority.newInstance(1), 2));
|
||||
ts.add(TestUtils.toSchedulerKey(Priority.newInstance(0), 4));
|
||||
ts.add(TestUtils.toSchedulerKey(Priority.newInstance(0), 3));
|
||||
ts.add(TestUtils.toSchedulerKey(Priority.newInstance(2), 5));
|
||||
ts.add(TestUtils.toSchedulerKey(Priority.newInstance(2), 6));
|
||||
Iterator<SchedulerRequestKey> iter = ts.iterator();
|
||||
SchedulerRequestKey sk = iter.next();
|
||||
Assert.assertEquals(0, sk.getPriority().getPriority());
|
||||
Assert.assertEquals(3, sk.getAllocationRequestId());
|
||||
sk = iter.next();
|
||||
Assert.assertEquals(0, sk.getPriority().getPriority());
|
||||
Assert.assertEquals(4, sk.getAllocationRequestId());
|
||||
sk = iter.next();
|
||||
Assert.assertEquals(1, sk.getPriority().getPriority());
|
||||
Assert.assertEquals(1, sk.getAllocationRequestId());
|
||||
sk = iter.next();
|
||||
Assert.assertEquals(1, sk.getPriority().getPriority());
|
||||
Assert.assertEquals(2, sk.getAllocationRequestId());
|
||||
sk = iter.next();
|
||||
Assert.assertEquals(2, sk.getPriority().getPriority());
|
||||
Assert.assertEquals(5, sk.getAllocationRequestId());
|
||||
sk = iter.next();
|
||||
Assert.assertEquals(2, sk.getPriority().getPriority());
|
||||
Assert.assertEquals(6, sk.getAllocationRequestId());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,274 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.ParameterizedSchedulerTestBase;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* Tests for checking Scheduling with allocationRequestId, i.e. mapping of
|
||||
* allocated containers to the original client {@code ResourceRequest}.
|
||||
*/
|
||||
public class TestSchedulingWithAllocationRequestId
|
||||
extends ParameterizedSchedulerTestBase {
|
||||
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(TestSchedulingWithAllocationRequestId.class);
|
||||
private static final int GB = 1024;
|
||||
|
||||
@Test
|
||||
public void testMultipleAllocationRequestIds() throws Exception {
|
||||
configureScheduler();
|
||||
YarnConfiguration conf = getConf();
|
||||
MockRM rm = new MockRM(conf);
|
||||
try {
|
||||
rm.start();
|
||||
|
||||
MockNM nm1 = rm.registerNode("127.0.0.1:1234", 4 * GB);
|
||||
MockNM nm2 = rm.registerNode("127.0.0.2:5678", 4 * GB);
|
||||
RMApp app1 = rm.submitApp(2048);
|
||||
// kick the scheduling
|
||||
nm1.nodeHeartbeat(true);
|
||||
RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
|
||||
MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId());
|
||||
am1.registerAppAttempt();
|
||||
|
||||
// add request for containers with id 10 & 20
|
||||
am1.addRequests(new String[] {"127.0.0.1" }, 2 * GB, 1, 1, 10L);
|
||||
AllocateResponse allocResponse = am1.schedule(); // send the request
|
||||
am1.addRequests(new String[] {"127.0.0.2" }, 2 * GB, 1, 2, 20L);
|
||||
allocResponse = am1.schedule(); // send the request
|
||||
|
||||
// check if request id 10 is satisfied
|
||||
nm1.nodeHeartbeat(true);
|
||||
allocResponse = am1.schedule(); // send the request
|
||||
while (allocResponse.getAllocatedContainers().size() < 1) {
|
||||
LOG.info("Waiting for containers to be created for app 1...");
|
||||
Thread.sleep(100);
|
||||
allocResponse = am1.schedule();
|
||||
}
|
||||
List<Container> allocated = allocResponse.getAllocatedContainers();
|
||||
Assert.assertEquals(1, allocated.size());
|
||||
checkAllocatedContainer(allocated.get(0), 2 * GB, nm1.getNodeId(), 10);
|
||||
|
||||
// check now if request id 20 is satisfied
|
||||
nm2.nodeHeartbeat(true);
|
||||
while (allocResponse.getAllocatedContainers().size() < 2) {
|
||||
LOG.info("Waiting for containers to be created for app 1...");
|
||||
Thread.sleep(100);
|
||||
allocResponse = am1.schedule();
|
||||
}
|
||||
|
||||
allocated = allocResponse.getAllocatedContainers();
|
||||
Assert.assertEquals(2, allocated.size());
|
||||
for (Container container : allocated) {
|
||||
checkAllocatedContainer(container, 2 * GB, nm2.getNodeId(), 20);
|
||||
}
|
||||
} finally {
|
||||
if (rm != null) {
|
||||
rm.stop();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMultipleAllocationRequestDiffPriority() throws Exception {
|
||||
configureScheduler();
|
||||
YarnConfiguration conf = getConf();
|
||||
MockRM rm = new MockRM(conf);
|
||||
try {
|
||||
rm.start();
|
||||
|
||||
MockNM nm1 = rm.registerNode("127.0.0.1:1234", 4 * GB);
|
||||
MockNM nm2 = rm.registerNode("127.0.0.2:5678", 4 * GB);
|
||||
RMApp app1 = rm.submitApp(2048);
|
||||
// kick the scheduling
|
||||
nm1.nodeHeartbeat(true);
|
||||
RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
|
||||
MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId());
|
||||
am1.registerAppAttempt();
|
||||
|
||||
// add request for containers with id 10 & 20
|
||||
am1.addRequests(new String[] {"127.0.0.1" }, 2 * GB, 2, 1, 10L);
|
||||
AllocateResponse allocResponse = am1.schedule(); // send the request
|
||||
am1.addRequests(new String[] {"127.0.0.2" }, 2 * GB, 1, 2, 20L);
|
||||
allocResponse = am1.schedule(); // send the request
|
||||
|
||||
// check if request id 20 is satisfied first
|
||||
nm2.nodeHeartbeat(true);
|
||||
while (allocResponse.getAllocatedContainers().size() < 2) {
|
||||
LOG.info("Waiting for containers to be created for app 1...");
|
||||
Thread.sleep(100);
|
||||
allocResponse = am1.schedule();
|
||||
}
|
||||
|
||||
List<Container> allocated = allocResponse.getAllocatedContainers();
|
||||
Assert.assertEquals(2, allocated.size());
|
||||
for (Container container : allocated) {
|
||||
checkAllocatedContainer(container, 2 * GB, nm2.getNodeId(), 20);
|
||||
}
|
||||
|
||||
// check now if request id 10 is satisfied
|
||||
nm1.nodeHeartbeat(true);
|
||||
allocResponse = am1.schedule(); // send the request
|
||||
while (allocResponse.getAllocatedContainers().size() < 1) {
|
||||
LOG.info("Waiting for containers to be created for app 1...");
|
||||
Thread.sleep(100);
|
||||
allocResponse = am1.schedule();
|
||||
}
|
||||
allocated = allocResponse.getAllocatedContainers();
|
||||
Assert.assertEquals(1, allocated.size());
|
||||
checkAllocatedContainer(allocated.get(0), 2 * GB, nm1.getNodeId(), 10);
|
||||
} finally {
|
||||
if (rm != null) {
|
||||
rm.stop();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void checkAllocatedContainer(Container allocated, int memory,
|
||||
NodeId nodeId, long allocationRequestId) {
|
||||
Assert.assertEquals(memory, allocated.getResource().getMemorySize());
|
||||
Assert.assertEquals(nodeId, allocated.getNodeId());
|
||||
Assert.assertEquals(allocationRequestId,
|
||||
allocated.getAllocationRequestId());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMultipleAppsWithAllocationReqId() throws Exception {
|
||||
configureScheduler();
|
||||
YarnConfiguration conf = getConf();
|
||||
MockRM rm = new MockRM(conf);
|
||||
try {
|
||||
rm.start();
|
||||
|
||||
// Register node1
|
||||
String host0 = "host_0";
|
||||
String host1 = "host_1";
|
||||
MockNM nm1 =
|
||||
new MockNM(host0 + ":1234", 8 * GB, rm.getResourceTrackerService());
|
||||
nm1.registerNode();
|
||||
|
||||
// Register node2
|
||||
MockNM nm2 =
|
||||
new MockNM(host1 + ":2351", 8 * GB, rm.getResourceTrackerService());
|
||||
nm2.registerNode();
|
||||
|
||||
// submit 1st app
|
||||
RMApp app1 = rm.submitApp(1 * GB, "user_0", "a1");
|
||||
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);
|
||||
|
||||
// Submit app1 RR with allocationReqId = 5
|
||||
int numContainers = 1;
|
||||
am1.addRequests(new String[] {host0, host1 }, 1 * GB, 1, numContainers,
|
||||
5L);
|
||||
AllocateResponse allocResponse = am1.schedule();
|
||||
|
||||
// wait for containers to be allocated.
|
||||
nm1.nodeHeartbeat(true);
|
||||
allocResponse = am1.schedule(); // send the request
|
||||
while (allocResponse.getAllocatedContainers().size() < 1) {
|
||||
LOG.info("Waiting for containers to be created for app 1...");
|
||||
Thread.sleep(100);
|
||||
allocResponse = am1.schedule();
|
||||
}
|
||||
|
||||
List<Container> allocated = allocResponse.getAllocatedContainers();
|
||||
Assert.assertEquals(1, allocated.size());
|
||||
checkAllocatedContainer(allocated.get(0), 1 * GB, nm1.getNodeId(), 5L);
|
||||
|
||||
// Submit another application
|
||||
RMApp app2 = rm.submitApp(1 * GB, "user_1", "a2");
|
||||
MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm2);
|
||||
|
||||
// Submit app2 RR with allocationReqId = 5
|
||||
am2.addRequests(new String[] {host0, host1 }, 2 * GB, 1, numContainers,
|
||||
5L);
|
||||
am2.schedule();
|
||||
|
||||
// wait for containers to be allocated.
|
||||
nm2.nodeHeartbeat(true);
|
||||
allocResponse = am2.schedule(); // send the request
|
||||
while (allocResponse.getAllocatedContainers().size() < 1) {
|
||||
LOG.info("Waiting for containers to be created for app 1...");
|
||||
Thread.sleep(100);
|
||||
allocResponse = am2.schedule();
|
||||
}
|
||||
|
||||
allocated = allocResponse.getAllocatedContainers();
|
||||
Assert.assertEquals(1, allocated.size());
|
||||
checkAllocatedContainer(allocated.get(0), 2 * GB, nm2.getNodeId(), 5L);
|
||||
|
||||
// Now submit app2 RR with allocationReqId = 10
|
||||
am2.addRequests(new String[] {host0, host1 }, 3 * GB, 1, numContainers,
|
||||
10L);
|
||||
am2.schedule();
|
||||
|
||||
// wait for containers to be allocated.
|
||||
nm1.nodeHeartbeat(true);
|
||||
allocResponse = am2.schedule(); // send the request
|
||||
while (allocResponse.getAllocatedContainers().size() < 1) {
|
||||
LOG.info("Waiting for containers to be created for app 1...");
|
||||
Thread.sleep(100);
|
||||
allocResponse = am2.schedule();
|
||||
}
|
||||
|
||||
allocated = allocResponse.getAllocatedContainers();
|
||||
Assert.assertEquals(1, allocated.size());
|
||||
checkAllocatedContainer(allocated.get(0), 3 * GB, nm1.getNodeId(), 10L);
|
||||
|
||||
// Now submit app1 RR with allocationReqId = 10
|
||||
am1.addRequests(new String[] {host0, host1 }, 4 * GB, 1, numContainers,
|
||||
10L);
|
||||
am1.schedule();
|
||||
|
||||
// wait for containers to be allocated.
|
||||
nm2.nodeHeartbeat(true);
|
||||
allocResponse = am1.schedule(); // send the request
|
||||
while (allocResponse.getAllocatedContainers().size() < 1) {
|
||||
LOG.info("Waiting for containers to be created for app 1...");
|
||||
Thread.sleep(100);
|
||||
allocResponse = am1.schedule();
|
||||
}
|
||||
|
||||
allocated = allocResponse.getAllocatedContainers();
|
||||
Assert.assertEquals(1, allocated.size());
|
||||
checkAllocatedContainer(allocated.get(0), 4 * GB, nm2.getNodeId(), 10L);
|
||||
} finally {
|
||||
if (rm != null) {
|
||||
rm.stop();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -51,7 +51,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsMana
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
|
||||
|
@ -412,4 +411,11 @@ public class TestUtils {
|
|||
return SchedulerRequestKey.create(ResourceRequest.newInstance(
|
||||
Priority.newInstance(pri), null, null, 0));
|
||||
}
|
||||
|
||||
public static SchedulerRequestKey toSchedulerKey(Priority pri,
|
||||
long allocationRequestId) {
|
||||
ResourceRequest req = ResourceRequest.newInstance(pri, null, null, 0);
|
||||
req.setAllocationRequestId(allocationRequestId);
|
||||
return SchedulerRequestKey.create(req);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue