YARN-10259. Fix reservation logic in Multi Node Placement.
Reviewed by Wangda Tan.
(cherry picked from commit 6ce295b787
)
This commit is contained in:
parent
b0c9e4f1b5
commit
07b8963aa3
|
@ -1008,11 +1008,15 @@ public class LeafQueue extends AbstractCSQueue {
|
||||||
private CSAssignment allocateFromReservedContainer(Resource clusterResource,
|
private CSAssignment allocateFromReservedContainer(Resource clusterResource,
|
||||||
CandidateNodeSet<FiCaSchedulerNode> candidates,
|
CandidateNodeSet<FiCaSchedulerNode> candidates,
|
||||||
ResourceLimits currentResourceLimits, SchedulingMode schedulingMode) {
|
ResourceLimits currentResourceLimits, SchedulingMode schedulingMode) {
|
||||||
// Considering multi-node scheduling, its better to iterate through
|
|
||||||
// all candidates and stop once we get atleast one good node to allocate
|
// Irrespective of Single / Multi Node Placement, the allocate from
|
||||||
// where reservation was made earlier. In normal case, there is only one
|
// Reserved Container has to happen only for the single node which
|
||||||
// node and hence there wont be any impact after this change.
|
// CapacityScheduler#allocateFromReservedContainer invokes with.
|
||||||
for (FiCaSchedulerNode node : candidates.getAllNodes().values()) {
|
// Else In Multi Node Placement, there won't be any Allocation or
|
||||||
|
// Reserve of new containers when there is a RESERVED container on
|
||||||
|
// a node which is full.
|
||||||
|
FiCaSchedulerNode node = CandidateNodeSetUtils.getSingleNode(candidates);
|
||||||
|
if (node != null) {
|
||||||
RMContainer reservedContainer = node.getReservedContainer();
|
RMContainer reservedContainer = node.getReservedContainer();
|
||||||
if (reservedContainer != null) {
|
if (reservedContainer != null) {
|
||||||
FiCaSchedulerApp application = getApplication(
|
FiCaSchedulerApp application = getApplication(
|
||||||
|
|
|
@ -837,6 +837,7 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
|
||||||
// Do checks before determining which node to allocate
|
// Do checks before determining which node to allocate
|
||||||
// Directly return if this check fails.
|
// Directly return if this check fails.
|
||||||
ContainerAllocation result;
|
ContainerAllocation result;
|
||||||
|
ContainerAllocation lastReservation = null;
|
||||||
|
|
||||||
AppPlacementAllocator<FiCaSchedulerNode> schedulingPS =
|
AppPlacementAllocator<FiCaSchedulerNode> schedulingPS =
|
||||||
application.getAppSchedulingInfo().getAppPlacementAllocator(
|
application.getAppSchedulingInfo().getAppPlacementAllocator(
|
||||||
|
@ -878,11 +879,24 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
|
||||||
result = tryAllocateOnNode(clusterResource, node, schedulingMode,
|
result = tryAllocateOnNode(clusterResource, node, schedulingMode,
|
||||||
resourceLimits, schedulerKey, reservedContainer);
|
resourceLimits, schedulerKey, reservedContainer);
|
||||||
|
|
||||||
if (AllocationState.ALLOCATED == result.getAllocationState()
|
if (AllocationState.ALLOCATED == result.getAllocationState()) {
|
||||||
|| AllocationState.RESERVED == result.getAllocationState()) {
|
|
||||||
result = doAllocation(result, node, schedulerKey, reservedContainer);
|
result = doAllocation(result, node, schedulerKey, reservedContainer);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// In MultiNodePlacement, Try Allocate on other Available nodes
|
||||||
|
// from Iterator as well before Reserving. Else there won't be any
|
||||||
|
// Allocate of new containers when the first node in the
|
||||||
|
// iterator could not fit and returns RESERVED allocation.
|
||||||
|
if (AllocationState.RESERVED == result.getAllocationState()) {
|
||||||
|
lastReservation = result;
|
||||||
|
if (iter.hasNext()) {
|
||||||
|
continue;
|
||||||
|
} else {
|
||||||
|
result = doAllocation(lastReservation, node, schedulerKey,
|
||||||
|
reservedContainer);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return result;
|
return result;
|
||||||
|
|
|
@ -22,6 +22,7 @@ import java.util.ArrayList;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
||||||
|
@ -223,6 +224,7 @@ public class TestCapacitySchedulerMultiNodes extends CapacitySchedulerTestBase {
|
||||||
|
|
||||||
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
|
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
|
||||||
RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
|
RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
|
||||||
|
RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
|
||||||
LeafQueue leafQueue = (LeafQueue) cs.getQueue("default");
|
LeafQueue leafQueue = (LeafQueue) cs.getQueue("default");
|
||||||
FiCaSchedulerApp schedulerApp1 =
|
FiCaSchedulerApp schedulerApp1 =
|
||||||
cs.getApplicationAttempt(am1.getApplicationAttemptId());
|
cs.getApplicationAttempt(am1.getApplicationAttemptId());
|
||||||
|
@ -234,12 +236,13 @@ public class TestCapacitySchedulerMultiNodes extends CapacitySchedulerTestBase {
|
||||||
* after its ask has been cancelled when used capacity of root queue is 1.
|
* after its ask has been cancelled when used capacity of root queue is 1.
|
||||||
*/
|
*/
|
||||||
// Ask a container with 6GB memory size for app1,
|
// Ask a container with 6GB memory size for app1,
|
||||||
// nm1 will reserve a container for app1
|
// nm2 will reserve a container for app1
|
||||||
|
// Last Node from Node Iterator will be RESERVED
|
||||||
am1.allocate("*", 6 * GB, 1, new ArrayList<>());
|
am1.allocate("*", 6 * GB, 1, new ArrayList<>());
|
||||||
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
|
cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
|
||||||
|
|
||||||
// Check containers of app1 and app2.
|
// Check containers of app1 and app2.
|
||||||
Assert.assertNotNull(cs.getNode(nm1.getNodeId()).getReservedContainer());
|
Assert.assertNotNull(cs.getNode(nm2.getNodeId()).getReservedContainer());
|
||||||
Assert.assertEquals(1, schedulerApp1.getLiveContainers().size());
|
Assert.assertEquals(1, schedulerApp1.getLiveContainers().size());
|
||||||
Assert.assertEquals(1, schedulerApp1.getReservedContainers().size());
|
Assert.assertEquals(1, schedulerApp1.getReservedContainers().size());
|
||||||
Assert.assertEquals(1, schedulerApp2.getLiveContainers().size());
|
Assert.assertEquals(1, schedulerApp2.getLiveContainers().size());
|
||||||
|
@ -324,12 +327,13 @@ public class TestCapacitySchedulerMultiNodes extends CapacitySchedulerTestBase {
|
||||||
* after node has sufficient resource.
|
* after node has sufficient resource.
|
||||||
*/
|
*/
|
||||||
// Ask a container with 6GB memory size for app2,
|
// Ask a container with 6GB memory size for app2,
|
||||||
// nm1 will reserve a container for app2
|
// nm2 will reserve a container for app2
|
||||||
|
// Last Node from Node Iterator will be RESERVED
|
||||||
am2.allocate("*", 6 * GB, 1, new ArrayList<>());
|
am2.allocate("*", 6 * GB, 1, new ArrayList<>());
|
||||||
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
|
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
|
||||||
|
|
||||||
// Check containers of app1 and app2.
|
// Check containers of app1 and app2.
|
||||||
Assert.assertNotNull(cs.getNode(nm1.getNodeId()).getReservedContainer());
|
Assert.assertNotNull(cs.getNode(nm2.getNodeId()).getReservedContainer());
|
||||||
Assert.assertEquals(1, schedulerApp1.getLiveContainers().size());
|
Assert.assertEquals(1, schedulerApp1.getLiveContainers().size());
|
||||||
Assert.assertEquals(1, schedulerApp2.getLiveContainers().size());
|
Assert.assertEquals(1, schedulerApp2.getLiveContainers().size());
|
||||||
Assert.assertEquals(1, schedulerApp2.getReservedContainers().size());
|
Assert.assertEquals(1, schedulerApp2.getReservedContainers().size());
|
||||||
|
@ -344,4 +348,100 @@ public class TestCapacitySchedulerMultiNodes extends CapacitySchedulerTestBase {
|
||||||
|
|
||||||
rm1.close();
|
rm1.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout=30000)
|
||||||
|
public void testAllocateOfReservedContainerFromAnotherNode()
|
||||||
|
throws Exception {
|
||||||
|
CapacitySchedulerConfiguration newConf =
|
||||||
|
new CapacitySchedulerConfiguration(conf);
|
||||||
|
newConf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER,
|
||||||
|
YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER);
|
||||||
|
newConf.setInt(CapacitySchedulerConfiguration.MULTI_NODE_SORTING_POLICY_NAME
|
||||||
|
+ ".resource-based.sorting-interval.ms", 0);
|
||||||
|
newConf.setMaximumApplicationMasterResourcePerQueuePercent("root.default",
|
||||||
|
1.0f);
|
||||||
|
MockRM rm1 = new MockRM(newConf);
|
||||||
|
|
||||||
|
rm1.start();
|
||||||
|
MockNM nm1 = rm1.registerNode("h1:1234", 12 * GB, 2);
|
||||||
|
MockNM nm2 = rm1.registerNode("h2:1234", 12 * GB, 2);
|
||||||
|
|
||||||
|
// launch an app1 to queue, AM container will be launched in nm1
|
||||||
|
RMApp app1 = MockRMAppSubmitter.submit(rm1,
|
||||||
|
MockRMAppSubmissionData.Builder.createWithMemory(8 * GB, rm1)
|
||||||
|
.withAppName("app")
|
||||||
|
.withUser("user")
|
||||||
|
.withAcls(null)
|
||||||
|
.withQueue("default")
|
||||||
|
.build());
|
||||||
|
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
||||||
|
|
||||||
|
// launch another app2 to queue, AM container will be launched in nm2
|
||||||
|
RMApp app2 = MockRMAppSubmitter.submit(rm1,
|
||||||
|
MockRMAppSubmissionData.Builder.createWithMemory(8 * GB, rm1)
|
||||||
|
.withAppName("app")
|
||||||
|
.withUser("user")
|
||||||
|
.withAcls(null)
|
||||||
|
.withQueue("default")
|
||||||
|
.build());
|
||||||
|
MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2);
|
||||||
|
|
||||||
|
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
|
||||||
|
RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
|
||||||
|
RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
|
||||||
|
|
||||||
|
// Reserve a Container for app3
|
||||||
|
RMApp app3 = MockRMAppSubmitter.submit(rm1,
|
||||||
|
MockRMAppSubmissionData.Builder.createWithMemory(8 * GB, rm1)
|
||||||
|
.withAppName("app")
|
||||||
|
.withUser("user")
|
||||||
|
.withAcls(null)
|
||||||
|
.withQueue("default")
|
||||||
|
.build());
|
||||||
|
|
||||||
|
final AtomicBoolean result = new AtomicBoolean(false);
|
||||||
|
Thread t = new Thread() {
|
||||||
|
public void run() {
|
||||||
|
try {
|
||||||
|
MockAM am3 = MockRM.launchAndRegisterAM(app3, rm1, nm1);
|
||||||
|
result.set(true);
|
||||||
|
} catch (Exception e) {
|
||||||
|
Assert.fail("Failed to allocate the reserved container");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
t.start();
|
||||||
|
Thread.sleep(1000);
|
||||||
|
|
||||||
|
// Validate if app3 has got RESERVED container
|
||||||
|
FiCaSchedulerApp schedulerApp =
|
||||||
|
cs.getApplicationAttempt(app3.getCurrentAppAttempt().getAppAttemptId());
|
||||||
|
Assert.assertEquals("App3 failed to get reserved container", 1,
|
||||||
|
schedulerApp.getReservedContainers().size());
|
||||||
|
|
||||||
|
// Free the Space on other node where Reservation has not happened
|
||||||
|
if (cs.getNode(rmNode1.getNodeID()).getReservedContainer() != null) {
|
||||||
|
rm1.killApp(app2.getApplicationId());
|
||||||
|
cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
|
||||||
|
} else {
|
||||||
|
rm1.killApp(app1.getApplicationId());
|
||||||
|
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if Reserved AM of app3 gets allocated in
|
||||||
|
// node where space available
|
||||||
|
while (!result.get()) {
|
||||||
|
Thread.sleep(100);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Validate release of reserved containers
|
||||||
|
schedulerApp =
|
||||||
|
cs.getApplicationAttempt(app3.getCurrentAppAttempt().getAppAttemptId());
|
||||||
|
Assert.assertEquals("App3 failed to release Reserved container", 0,
|
||||||
|
schedulerApp.getReservedContainers().size());
|
||||||
|
Assert.assertNull(cs.getNode(rmNode1.getNodeID()).getReservedContainer());
|
||||||
|
Assert.assertNull(cs.getNode(rmNode2.getNodeID()).getReservedContainer());
|
||||||
|
|
||||||
|
rm1.close();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue