From 9c3fc3ef2865164aa5f121793ac914cfeb21a181 Mon Sep 17 00:00:00 2001 From: Weiwei Yang Date: Tue, 21 Aug 2018 22:42:23 +0800 Subject: [PATCH] YARN-7494. Add muti-node lookup mechanism and pluggable nodes sorting policies to optimize placement decision. Contributed by Sunil Govindan. --- .../RMActiveServiceContext.java | 16 ++ .../server/resourcemanager/RMContext.java | 8 +- .../server/resourcemanager/RMContextImpl.java | 14 +- .../resourcemanager/ResourceManager.java | 12 ++ .../scheduler/AppSchedulingInfo.java | 11 +- .../scheduler/ClusterNodeTracker.java | 61 +++++++ .../activities/ActivitiesLogger.java | 32 ++-- .../activities/ActivitiesManager.java | 8 +- .../scheduler/capacity/AbstractCSQueue.java | 16 +- .../scheduler/capacity/CSQueue.java | 6 + .../scheduler/capacity/CapacityScheduler.java | 77 +++++++- .../CapacitySchedulerConfiguration.java | 116 ++++++++++++ .../scheduler/capacity/LeafQueue.java | 47 +++-- .../scheduler/capacity/ParentQueue.java | 4 +- .../allocator/RegularContainerAllocator.java | 35 ++-- .../common/ApplicationSchedulingConfig.java | 4 + .../common/fica/FiCaSchedulerApp.java | 23 +++ .../LocalityAppPlacementAllocator.java | 34 +++- .../placement/MultiNodeLookupPolicy.java | 67 +++++++ .../placement/MultiNodePolicySpec.java | 56 ++++++ .../scheduler/placement/MultiNodeSorter.java | 167 ++++++++++++++++++ .../placement/MultiNodeSortingManager.java | 139 +++++++++++++++ .../ResourceUsageMultiNodeLookupPolicy.java | 79 +++++++++ .../ReservationSystemTestUtil.java | 3 + .../scheduler/TestAppSchedulingInfo.java | 3 +- .../capacity/CapacitySchedulerTestBase.java | 13 ++ .../capacity/TestCapacityScheduler.java | 15 -- .../TestCapacitySchedulerMultiNodes.java | 166 +++++++++++++++++ .../TestCapacitySchedulerNodeLabelUpdate.java | 70 ++++++++ 29 files changed, 1210 insertions(+), 92 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodeLookupPolicy.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodePolicySpec.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodeSorter.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodeSortingManager.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/ResourceUsageMultiNodeLookupPolicy.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerMultiNodes.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java index 66065e33bae..8fb0de63fdf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java @@ -43,9 +43,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.monitor.RMAppLifetime import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTagsManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.PlacementConstraintManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.QueueLimitCalculator; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.MultiNodeSortingManager; import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer; @@ -113,6 +115,7 @@ public class RMActiveServiceContext { private AllocationTagsManager allocationTagsManager; private PlacementConstraintManager placementConstraintManager; private ResourceProfilesManager resourceProfilesManager; + private MultiNodeSortingManager multiNodeSortingManager; public RMActiveServiceContext() { queuePlacementManager = new PlacementManager(); @@ -441,6 +444,19 @@ public class RMActiveServiceContext { rmDelegatedNodeLabelsUpdater = nodeLablesUpdater; } + @Private + @Unstable + public MultiNodeSortingManager getMultiNodeSortingManager() { + return multiNodeSortingManager; + } + + @Private + @Unstable + public void setMultiNodeSortingManager( + MultiNodeSortingManager multiNodeSortingManager) { + this.multiNodeSortingManager = multiNodeSortingManager; + } + @Private @Unstable public void setSchedulerRecoveryStartAndWaitTime(long waitTime) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java index eb91a311a3a..a30ff76a6ea 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java @@ -42,10 +42,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.monitor.RMAppLifetime import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; - +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.PlacementConstraintManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTagsManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.QueueLimitCalculator; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.MultiNodeSortingManager; import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer; @@ -177,4 +178,9 @@ public interface RMContext extends ApplicationMasterServiceContext { void setPlacementConstraintManager( PlacementConstraintManager placementConstraintManager); + + MultiNodeSortingManager getMultiNodeSortingManager(); + + void setMultiNodeSortingManager( + MultiNodeSortingManager multiNodeSortingManager); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java index 84e0f6f6b58..cb1d56f34fb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java @@ -48,10 +48,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.monitor.RMAppLifetime import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; - +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTagsManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.PlacementConstraintManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.QueueLimitCalculator; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.MultiNodeSortingManager; import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer; @@ -538,6 +539,17 @@ public class RMContextImpl implements RMContext { delegatedNodeLabelsUpdater); } + @Override + public MultiNodeSortingManager getMultiNodeSortingManager() { + return activeServiceContext.getMultiNodeSortingManager(); + } + + @Override + public void setMultiNodeSortingManager( + MultiNodeSortingManager multiNodeSortingManager) { + activeServiceContext.setMultiNodeSortingManager(multiNodeSortingManager); + } + public void setSchedulerRecoveryStartAndWaitTime(long waitTime) { activeServiceContext.setSchedulerRecoveryStartAndWaitTime(waitTime); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index d459f0e952b..bdda8717f6b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -96,11 +96,13 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTagsManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.MemoryPlacementConstraintManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.PlacementConstraintManagerService; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.MultiNodeSortingManager; import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer; import org.apache.hadoop.yarn.server.resourcemanager.security.QueueACLsManager; import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollectorManager; @@ -546,6 +548,10 @@ public class ResourceManager extends CompositeService return new FederationStateStoreService(rmContext); } + protected MultiNodeSortingManager createMultiNodeSortingManager() { + return new MultiNodeSortingManager(); + } + protected SystemMetricsPublisher createSystemMetricsPublisher() { List publishers = new ArrayList(); @@ -665,6 +671,12 @@ public class ResourceManager extends CompositeService resourceProfilesManager.init(conf); rmContext.setResourceProfilesManager(resourceProfilesManager); + MultiNodeSortingManager multiNodeSortingManager = + createMultiNodeSortingManager(); + multiNodeSortingManager.setRMContext(rmContext); + addService(multiNodeSortingManager); + rmContext.setMultiNodeSortingManager(multiNodeSortingManager); + RMDelegatedNodeLabelsUpdater delegatedNodeLabelsUpdater = createRMDelegatedNodeLabelsUpdater(); if (delegatedNodeLabelsUpdater != null) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java index d63d2b82fbd..ca7d9ce712e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java @@ -93,7 +93,7 @@ public class AppSchedulingInfo { private final ReentrantReadWriteLock.WriteLock writeLock; public final ContainerUpdateContext updateContext; - public final Map applicationSchedulingEnvs = new HashMap<>(); + private final Map applicationSchedulingEnvs = new HashMap<>(); private final RMContext rmContext; public AppSchedulingInfo(ApplicationAttemptId appAttemptId, String user, @@ -782,4 +782,13 @@ public class AppSchedulingInfo { this.readLock.unlock(); } } + + /** + * Get scheduling envs configured for this application. + * + * @return a map of applicationSchedulingEnvs + */ + public Map getApplicationSchedulingEnvs() { + return applicationSchedulingEnvs; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java index 66d88108932..8c7e4479dec 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java @@ -37,6 +37,7 @@ import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -57,6 +58,7 @@ public class ClusterNodeTracker { private HashMap nodes = new HashMap<>(); private Map nodeNameToNodeMap = new HashMap<>(); private Map> nodesPerRack = new HashMap<>(); + private Map> nodesPerLabel = new HashMap<>(); private Resource clusterCapacity = Resources.createResource(0, 0); private volatile Resource staleClusterCapacity = @@ -80,6 +82,16 @@ public class ClusterNodeTracker { nodes.put(node.getNodeID(), node); nodeNameToNodeMap.put(node.getNodeName(), node); + List nodesPerLabels = nodesPerLabel.get(node.getPartition()); + + if (nodesPerLabels == null) { + nodesPerLabels = new ArrayList(); + } + nodesPerLabels.add(node); + + // Update new set of nodes for given partition. + nodesPerLabel.put(node.getPartition(), nodesPerLabels); + // Update nodes per rack as well String rackName = node.getRackName(); List nodesList = nodesPerRack.get(rackName); @@ -174,6 +186,16 @@ public class ClusterNodeTracker { } } + List nodesPerPartition = nodesPerLabel.get(node.getPartition()); + nodesPerPartition.remove(node); + + // Update new set of nodes for given partition. + if (nodesPerPartition.isEmpty()) { + nodesPerLabel.remove(node.getPartition()); + } else { + nodesPerLabel.put(node.getPartition(), nodesPerPartition); + } + // Update cluster capacity Resources.subtractFrom(clusterCapacity, node.getTotalResource()); staleClusterCapacity = Resources.clone(clusterCapacity); @@ -420,4 +442,43 @@ public class ClusterNodeTracker { } return retNodes; } + + /** + * update cached nodes per partition on a node label change event. + * @param partition nodeLabel + * @param nodeIds List of Node IDs + */ + public void updateNodesPerPartition(String partition, Set nodeIds) { + writeLock.lock(); + try { + // Clear all entries. + nodesPerLabel.remove(partition); + + List nodesPerPartition = new ArrayList(); + for (NodeId nodeId : nodeIds) { + N n = getNode(nodeId); + if (n != null) { + nodesPerPartition.add(n); + } + } + + // Update new set of nodes for given partition. + nodesPerLabel.put(partition, nodesPerPartition); + } finally { + writeLock.unlock(); + } + } + + public List getNodesPerPartition(String partition) { + List nodesPerPartition = null; + readLock.lock(); + try { + if (nodesPerLabel.containsKey(partition)) { + nodesPerPartition = new ArrayList(nodesPerLabel.get(partition)); + } + } finally { + readLock.unlock(); + } + return nodesPerPartition; + } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesLogger.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesLogger.java index 0c351b671a3..8a3ffce1d8a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesLogger.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesLogger.java @@ -21,13 +21,13 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; /** * Utility for logging scheduler activities @@ -63,7 +63,7 @@ public class ActivitiesLogger { SchedulerApplicationAttempt application, Priority priority, String diagnostic) { String type = "app"; - if (activitiesManager == null) { + if (node == null || activitiesManager == null) { return; } if (activitiesManager.shouldRecordThisNode(node.getNodeID())) { @@ -84,18 +84,18 @@ public class ActivitiesLogger { ActivitiesManager activitiesManager, SchedulerNode node, SchedulerApplicationAttempt application, Priority priority, String diagnostic, ActivityState appState) { - if (activitiesManager == null) { + if (node == null || activitiesManager == null) { return; } if (activitiesManager.shouldRecordThisNode(node.getNodeID())) { String type = "container"; // Add application-container activity into specific node allocation. - activitiesManager.addSchedulingActivityForNode(node.getNodeID(), + activitiesManager.addSchedulingActivityForNode(node, application.getApplicationId().toString(), null, priority.toString(), ActivityState.SKIPPED, diagnostic, type); type = "app"; // Add queue-application activity into specific node allocation. - activitiesManager.addSchedulingActivityForNode(node.getNodeID(), + activitiesManager.addSchedulingActivityForNode(node, application.getQueueName(), application.getApplicationId().toString(), application.getPriority().toString(), ActivityState.SKIPPED, @@ -121,20 +121,20 @@ public class ActivitiesLogger { ActivitiesManager activitiesManager, SchedulerNode node, SchedulerApplicationAttempt application, RMContainer updatedContainer, ActivityState activityState) { - if (activitiesManager == null) { + if (node == null || activitiesManager == null) { return; } if (activitiesManager.shouldRecordThisNode(node.getNodeID())) { String type = "container"; // Add application-container activity into specific node allocation. - activitiesManager.addSchedulingActivityForNode(node.getNodeID(), + activitiesManager.addSchedulingActivityForNode(node, application.getApplicationId().toString(), updatedContainer.getContainer().toString(), updatedContainer.getContainer().getPriority().toString(), activityState, ActivityDiagnosticConstant.EMPTY, type); type = "app"; // Add queue-application activity into specific node allocation. - activitiesManager.addSchedulingActivityForNode(node.getNodeID(), + activitiesManager.addSchedulingActivityForNode(node, application.getQueueName(), application.getApplicationId().toString(), application.getPriority().toString(), ActivityState.ACCEPTED, @@ -157,13 +157,15 @@ public class ActivitiesLogger { * update. */ public static void startAppAllocationRecording( - ActivitiesManager activitiesManager, NodeId nodeId, long currentTime, + ActivitiesManager activitiesManager, FiCaSchedulerNode node, + long currentTime, SchedulerApplicationAttempt application) { - if (activitiesManager == null) { + if (node == null || activitiesManager == null) { return; } - activitiesManager.startAppAllocationRecording(nodeId, currentTime, - application); + activitiesManager + .startAppAllocationRecording(node.getNodeID(), currentTime, + application); } /* @@ -208,7 +210,7 @@ public class ActivitiesLogger { public static void recordQueueActivity(ActivitiesManager activitiesManager, SchedulerNode node, String parentQueueName, String queueName, ActivityState state, String diagnostic) { - if (activitiesManager == null) { + if (node == null || activitiesManager == null) { return; } if (activitiesManager.shouldRecordThisNode(node.getNodeID())) { @@ -240,7 +242,7 @@ public class ActivitiesLogger { public static void finishAllocatedNodeAllocation( ActivitiesManager activitiesManager, SchedulerNode node, ContainerId containerId, AllocationState containerState) { - if (activitiesManager == null) { + if (node == null || activitiesManager == null) { return; } if (activitiesManager.shouldRecordThisNode(node.getNodeID())) { @@ -277,7 +279,7 @@ public class ActivitiesLogger { SchedulerNode node, String parentName, String childName, Priority priority, ActivityState state, String diagnostic, String type) { - activitiesManager.addSchedulingActivityForNode(node.getNodeID(), parentName, + activitiesManager.addSchedulingActivityForNode(node, parentName, childName, priority != null ? priority.toString() : null, state, diagnostic, type); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesManager.java index 8498c40cdef..5d96b17371e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesManager.java @@ -27,6 +27,7 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ActivitiesInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppActivitiesInfo; import org.apache.hadoop.yarn.util.SystemClock; @@ -197,11 +198,12 @@ public class ActivitiesManager extends AbstractService { } // Add queue, application or container activity into specific node allocation. - void addSchedulingActivityForNode(NodeId nodeID, String parentName, + void addSchedulingActivityForNode(SchedulerNode node, String parentName, String childName, String priority, ActivityState state, String diagnostic, String type) { - if (shouldRecordThisNode(nodeID)) { - NodeAllocation nodeAllocation = getCurrentNodeAllocation(nodeID); + if (shouldRecordThisNode(node.getNodeID())) { + NodeAllocation nodeAllocation = getCurrentNodeAllocation( + node.getNodeID()); nodeAllocation.addAllocationActivity(parentName, childName, priority, state, diagnostic, type); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java index 9c3e98f0e71..2c9f9a37255 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java @@ -92,7 +92,8 @@ public abstract class AbstractCSQueue implements CSQueue { Set resourceTypes; final RMNodeLabelsManager labelManager; String defaultLabelExpression; - + private String multiNodeSortingPolicyName = null; + Map acls = new HashMap(); volatile boolean reservationsContinueLooking; @@ -414,6 +415,10 @@ public abstract class AbstractCSQueue implements CSQueue { this.priority = configuration.getQueuePriority( getQueuePath()); + // Update multi-node sorting algorithm for scheduling as configured. + setMultiNodeSortingPolicyName( + configuration.getMultiNodesSortingAlgorithmPolicy(getQueuePath())); + this.userWeights = getUserWeightsFromHierarchy(configuration); } finally { writeLock.unlock(); @@ -1259,4 +1264,13 @@ public abstract class AbstractCSQueue implements CSQueue { this.writeLock.unlock(); } } + + @Override + public String getMultiNodeSortingPolicyName() { + return this.multiNodeSortingPolicyName; + } + + public void setMultiNodeSortingPolicyName(String policyName) { + this.multiNodeSortingPolicyName = policyName; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java index 3963dc0a5b7..c0c280e9f1e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java @@ -430,4 +430,10 @@ public interface CSQueue extends SchedulerQueue { * @return effective max queue capacity */ Resource getEffectiveMaxCapacityDown(String label, Resource factor); + + /** + * Get Multi Node scheduling policy name. + * @return policy name + */ + String getMultiNodeSortingPolicyName(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 0b7fe92b495..dec1301392e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.EnumSet; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -251,6 +252,7 @@ public class CapacityScheduler extends private ResourceCommitterService resourceCommitterService; private RMNodeLabelsManager labelManager; private AppPriorityACLsManager appPriorityACLManager; + private boolean multiNodePlacementEnabled; private static boolean printedVerboseLoggingForAsyncScheduling = false; @@ -391,12 +393,23 @@ public class CapacityScheduler extends // Setup how many containers we can allocate for each round offswitchPerHeartbeatLimit = this.conf.getOffSwitchPerHeartbeatLimit(); + // Register CS specific multi-node policies to common MultiNodeManager + // which will add to a MultiNodeSorter which gives a pre-sorted list of + // nodes to scheduler's allocation. + multiNodePlacementEnabled = this.conf.getMultiNodePlacementEnabled(); + if(rmContext.getMultiNodeSortingManager() != null) { + rmContext.getMultiNodeSortingManager().registerMultiNodePolicyNames( + multiNodePlacementEnabled, + this.conf.getMultiNodePlacementPolicies()); + } + LOG.info("Initialized CapacityScheduler with " + "calculator=" + getResourceCalculator().getClass() + ", " + "minimumAllocation=<" + getMinimumResourceCapability() + ">, " + "maximumAllocation=<" + getMaximumResourceCapability() + ">, " + "asynchronousScheduling=" + scheduleAsynchronously + ", " + "asyncScheduleInterval=" - + asyncScheduleInterval + "ms"); + + asyncScheduleInterval + "ms" + ",multiNodePlacementEnabled=" + + multiNodePlacementEnabled); } finally { writeLock.unlock(); } @@ -1373,18 +1386,23 @@ public class CapacityScheduler extends assignment.getAssignmentInformation().getAllocationDetails(); List reservations = assignment.getAssignmentInformation().getReservationDetails(); + // Get nodeId from allocated container if incoming argument is null. + NodeId updatedNodeid = (nodeId == null) + ? allocations.get(allocations.size() - 1).rmContainer.getNodeId() + : nodeId; + if (!allocations.isEmpty()) { ContainerId allocatedContainerId = allocations.get(allocations.size() - 1).containerId; String allocatedQueue = allocations.get(allocations.size() - 1).queue; - schedulerHealth.updateAllocation(now, nodeId, allocatedContainerId, + schedulerHealth.updateAllocation(now, updatedNodeid, allocatedContainerId, allocatedQueue); } if (!reservations.isEmpty()) { ContainerId reservedContainerId = reservations.get(reservations.size() - 1).containerId; String reservedQueue = reservations.get(reservations.size() - 1).queue; - schedulerHealth.updateReservation(now, nodeId, reservedContainerId, + schedulerHealth.updateReservation(now, updatedNodeid, reservedContainerId, reservedQueue); } schedulerHealth.updateSchedulerReservationCounts(assignment @@ -1421,6 +1439,23 @@ public class CapacityScheduler extends || assignedContainers < maxAssignPerHeartbeat); } + private CandidateNodeSet getCandidateNodeSet( + FiCaSchedulerNode node) { + CandidateNodeSet candidates = null; + candidates = new SimpleCandidateNodeSet<>(node); + if (multiNodePlacementEnabled) { + Map nodesByPartition = new HashMap<>(); + List nodes = nodeTracker + .getNodesPerPartition(node.getPartition()); + if (nodes != null && !nodes.isEmpty()) { + nodes.forEach(n -> nodesByPartition.put(n.getNodeID(), n)); + candidates = new SimpleCandidateNodeSet( + nodesByPartition, node.getPartition()); + } + } + return candidates; + } + /** * We need to make sure when doing allocation, Node should be existed * And we will construct a {@link CandidateNodeSet} before proceeding @@ -1432,8 +1467,8 @@ public class CapacityScheduler extends int offswitchCount = 0; int assignedContainers = 0; - CandidateNodeSet candidates = - new SimpleCandidateNodeSet<>(node); + CandidateNodeSet candidates = getCandidateNodeSet( + node); CSAssignment assignment = allocateContainersToNode(candidates, withNodeHeartbeat); // Only check if we can allocate more container on the same node when @@ -1599,10 +1634,13 @@ public class CapacityScheduler extends if (Resources.greaterThan(calculator, getClusterResource(), assignment.getResource(), Resources.none())) { + FiCaSchedulerNode node = CandidateNodeSetUtils.getSingleNode(candidates); + NodeId nodeId = null; + if (node != null) { + nodeId = node.getNodeID(); + } if (withNodeHeartbeat) { - updateSchedulerHealth(lastNodeUpdateTime, - CandidateNodeSetUtils.getSingleNode(candidates).getNodeID(), - assignment); + updateSchedulerHealth(lastNodeUpdateTime, nodeId, assignment); } return assignment; } @@ -1681,7 +1719,7 @@ public class CapacityScheduler extends // We have two different logics to handle allocation on single node / multi // nodes. CSAssignment assignment; - if (null != node) { + if (!multiNodePlacementEnabled) { assignment = allocateContainerOnSingleNode(candidates, node, withNodeHeartbeat); } else{ @@ -1869,12 +1907,21 @@ public class CapacityScheduler extends NodeLabelsUpdateSchedulerEvent labelUpdateEvent) { try { writeLock.lock(); + Set updateLabels = new HashSet(); for (Entry> entry : labelUpdateEvent .getUpdatedNodeToLabels().entrySet()) { NodeId id = entry.getKey(); Set labels = entry.getValue(); + FiCaSchedulerNode node = nodeTracker.getNode(id); + + if (node != null) { + // Update old partition to list. + updateLabels.add(node.getPartition()); + } updateLabelsOnNode(id, labels); + updateLabels.addAll(labels); } + refreshLabelToNodeCache(updateLabels); Resource clusterResource = getClusterResource(); getRootQueue().updateClusterResource(clusterResource, new ResourceLimits(clusterResource)); @@ -1883,6 +1930,18 @@ public class CapacityScheduler extends } } + private void refreshLabelToNodeCache(Set updateLabels) { + Map> labelMapping = labelManager + .getLabelsToNodes(updateLabels); + for (String label : updateLabels) { + Set nodes = labelMapping.get(label); + if (nodes == null) { + continue; + } + nodeTracker.updateNodesPerPartition(label, nodes); + } + } + private void addNode(RMNode nodeManager) { try { writeLock.lock(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index e8de096c6a4..b937ae7b1e0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -45,6 +45,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AppPriorityACLConfigurationParser.AppPriorityACLKeyType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.policy.PriorityUtilizationQueueOrderingPolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.policy.QueueOrderingPolicy; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.MultiNodeLookupPolicy; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.MultiNodePolicySpec; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FairOrderingPolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoOrderingPolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy; @@ -2129,4 +2131,118 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur break; } } + + @Private public static final String MULTI_NODE_SORTING_POLICIES = + PREFIX + "multi-node-sorting.policy.names"; + + @Private public static final String MULTI_NODE_SORTING_POLICY_NAME = + PREFIX + "multi-node-sorting.policy"; + + /** + * resource usage based node sorting algorithm. + */ + public static final String DEFAULT_NODE_SORTING_POLICY = "default"; + public static final String DEFAULT_NODE_SORTING_POLICY_CLASSNAME + = "org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.ResourceUsageMultiNodeLookupPolicy"; + public static final long DEFAULT_MULTI_NODE_SORTING_INTERVAL = 1000L; + + @Private + public static final String MULTI_NODE_PLACEMENT_ENABLED = PREFIX + + "multi-node-placement-enabled"; + + @Private + public static final boolean DEFAULT_MULTI_NODE_PLACEMENT_ENABLED = false; + + public String getMultiNodesSortingAlgorithmPolicy( + String queue) { + + String policyName = get( + getQueuePrefix(queue) + "multi-node-sorting.policy"); + + if (policyName == null) { + policyName = get(MULTI_NODE_SORTING_POLICY_NAME); + } + + // If node sorting policy is not configured in queue and in cluster level, + // it is been assumed that this queue is not enabled with multi-node lookup. + if (policyName == null || policyName.isEmpty()) { + return null; + } + + String policyClassName = get(MULTI_NODE_SORTING_POLICY_NAME + DOT + + policyName.trim() + DOT + "class"); + + if (policyClassName == null || policyClassName.isEmpty()) { + throw new YarnRuntimeException( + policyName.trim() + " Class is not configured or not an instance of " + + MultiNodeLookupPolicy.class.getCanonicalName()); + } + + return normalizePolicyName(policyClassName.trim()); + } + + public boolean getMultiNodePlacementEnabled() { + return getBoolean(MULTI_NODE_PLACEMENT_ENABLED, + DEFAULT_MULTI_NODE_PLACEMENT_ENABLED); + } + + public Set getMultiNodePlacementPolicies() { + String[] policies = getTrimmedStrings(MULTI_NODE_SORTING_POLICIES); + + // In other cases, split the accessibleLabelStr by "," + Set set = new HashSet(); + for (String str : policies) { + if (!str.trim().isEmpty()) { + String policyClassName = get( + MULTI_NODE_SORTING_POLICY_NAME + DOT + str.trim() + DOT + "class"); + if (str.trim().equals(DEFAULT_NODE_SORTING_POLICY)) { + policyClassName = get( + MULTI_NODE_SORTING_POLICY_NAME + DOT + str.trim() + DOT + "class", + DEFAULT_NODE_SORTING_POLICY_CLASSNAME); + } + + // This check is needed as default class name is loaded only for + // DEFAULT_NODE_SORTING_POLICY. + if (policyClassName == null) { + throw new YarnRuntimeException( + str.trim() + " Class is not configured or not an instance of " + + MultiNodeLookupPolicy.class.getCanonicalName()); + } + policyClassName = normalizePolicyName(policyClassName.trim()); + long policySortingInterval = getLong( + MULTI_NODE_SORTING_POLICY_NAME + DOT + str.trim() + + DOT + "sorting-interval.ms", + DEFAULT_MULTI_NODE_SORTING_INTERVAL); + if (policySortingInterval < 0) { + throw new YarnRuntimeException( + str.trim() + + " multi-node policy is configured with invalid" + + " sorting-interval:" + policySortingInterval); + } + set.add( + new MultiNodePolicySpec(policyClassName, policySortingInterval)); + } + } + + return Collections.unmodifiableSet(set); + } + + private String normalizePolicyName(String policyName) { + + // Ensure that custom node sorting algorithm class is valid. + try { + Class nodeSortingPolicyClazz = getClassByName(policyName); + if (MultiNodeLookupPolicy.class + .isAssignableFrom(nodeSortingPolicyClazz)) { + return policyName; + } else { + throw new YarnRuntimeException( + "Class: " + policyName + " not instance of " + + MultiNodeLookupPolicy.class.getCanonicalName()); + } + } catch (ClassNotFoundException e) { + throw new YarnRuntimeException( + "Could not instantiate " + "NodesSortingPolicy: " + policyName, e); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 366bad0a4f2..ffe862fc618 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -53,10 +53,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsMana import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; - -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.*; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityDiagnosticConstant; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesLogger; @@ -1036,23 +1032,24 @@ public class LeafQueue extends AbstractCSQueue { private CSAssignment allocateFromReservedContainer(Resource clusterResource, CandidateNodeSet candidates, ResourceLimits currentResourceLimits, SchedulingMode schedulingMode) { - FiCaSchedulerNode node = CandidateNodeSetUtils.getSingleNode(candidates); - if (null == node) { - return null; - } + // Considering multi-node scheduling, its better to iterate through + // all candidates and stop once we get atleast one good node to allocate + // where reservation was made earlier. In normal case, there is only one + // node and hence there wont be any impact after this change. + for (FiCaSchedulerNode node : candidates.getAllNodes().values()) { + RMContainer reservedContainer = node.getReservedContainer(); + if (reservedContainer != null) { + FiCaSchedulerApp application = getApplication( + reservedContainer.getApplicationAttemptId()); - RMContainer reservedContainer = node.getReservedContainer(); - if (reservedContainer != null) { - FiCaSchedulerApp application = getApplication( - reservedContainer.getApplicationAttemptId()); - - if (null != application) { - ActivitiesLogger.APP.startAppAllocationRecording(activitiesManager, - node.getNodeID(), SystemClock.getInstance().getTime(), application); - CSAssignment assignment = application.assignContainers(clusterResource, - candidates, currentResourceLimits, schedulingMode, - reservedContainer); - return assignment; + if (null != application) { + ActivitiesLogger.APP.startAppAllocationRecording(activitiesManager, + node, SystemClock.getInstance().getTime(), application); + CSAssignment assignment = application.assignContainers( + clusterResource, candidates, currentResourceLimits, + schedulingMode, reservedContainer); + return assignment; + } } } @@ -1114,13 +1111,14 @@ public class LeafQueue extends AbstractCSQueue { FiCaSchedulerApp application = assignmentIterator.next(); ActivitiesLogger.APP.startAppAllocationRecording(activitiesManager, - node.getNodeID(), SystemClock.getInstance().getTime(), application); + node, SystemClock.getInstance().getTime(), application); // Check queue max-capacity limit Resource appReserved = application.getCurrentReservation(); if (needAssignToQueueCheck) { - if (!super.canAssignToThisQueue(clusterResource, node.getPartition(), - currentResourceLimits, appReserved, schedulingMode)) { + if (!super.canAssignToThisQueue(clusterResource, + candidates.getPartition(), currentResourceLimits, appReserved, + schedulingMode)) { ActivitiesLogger.APP.recordRejectedAppActivityFromLeafQueue( activitiesManager, node, application, application.getPriority(), ActivityDiagnosticConstant.QUEUE_MAX_CAPACITY_LIMIT); @@ -1155,7 +1153,8 @@ public class LeafQueue extends AbstractCSQueue { userAssignable = false; } else { userAssignable = canAssignToUser(clusterResource, application.getUser(), - userLimit, application, node.getPartition(), currentResourceLimits); + userLimit, application, candidates.getPartition(), + currentResourceLimits); if (!userAssignable && Resources.fitsIn(cul.reservation, appReserved)) { cul.canAssign = false; cul.reservation = appReserved; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java index 2363b8809e8..80549ca5c16 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java @@ -553,8 +553,8 @@ public class ParentQueue extends AbstractCSQueue { ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, getParentName(), getQueueName(), ActivityState.REJECTED, - ActivityDiagnosticConstant.NOT_ABLE_TO_ACCESS_PARTITION + node - .getPartition()); + ActivityDiagnosticConstant.NOT_ABLE_TO_ACCESS_PARTITION + + candidates.getPartition()); if (rootQueue) { ActivitiesLogger.NODE.finishSkippedNodeAllocation(activitiesManager, node); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java index a843002cdb5..3e337ef492d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java @@ -96,11 +96,10 @@ public class RegularContainerAllocator extends AbstractContainerAllocator { * headroom, etc. */ private ContainerAllocation preCheckForNodeCandidateSet( - Resource clusterResource, CandidateNodeSet candidates, + Resource clusterResource, FiCaSchedulerNode node, SchedulingMode schedulingMode, ResourceLimits resourceLimits, SchedulerRequestKey schedulerKey) { Priority priority = schedulerKey.getPriority(); - FiCaSchedulerNode node = CandidateNodeSetUtils.getSingleNode(candidates); PendingAsk offswitchPendingAsk = application.getPendingAsk(schedulerKey, ResourceRequest.ANY); @@ -164,7 +163,7 @@ public class RegularContainerAllocator extends AbstractContainerAllocator { } if (!checkHeadroom(clusterResource, resourceLimits, required, - candidates.getPartition())) { + node.getPartition())) { if (LOG.isDebugEnabled()) { LOG.debug("cannot allocate required resource=" + required + " because of headroom"); @@ -801,20 +800,6 @@ public class RegularContainerAllocator extends AbstractContainerAllocator { // Do checks before determining which node to allocate // Directly return if this check fails. ContainerAllocation result; - if (reservedContainer == null) { - result = preCheckForNodeCandidateSet(clusterResource, candidates, - schedulingMode, resourceLimits, schedulerKey); - if (null != result) { - return result; - } - } else { - // pre-check when allocating reserved container - if (application.getOutstandingAsksCount(schedulerKey) == 0) { - // Release - return new ContainerAllocation(reservedContainer, null, - AllocationState.QUEUE_SKIPPED); - } - } AppPlacementAllocator schedulingPS = application.getAppSchedulingInfo().getAppPlacementAllocator( @@ -833,6 +818,22 @@ public class RegularContainerAllocator extends AbstractContainerAllocator { while (iter.hasNext()) { FiCaSchedulerNode node = iter.next(); + if (reservedContainer == null) { + result = preCheckForNodeCandidateSet(clusterResource, node, + schedulingMode, resourceLimits, schedulerKey); + if (null != result) { + continue; + } + } else { + // pre-check when allocating reserved container + if (application.getOutstandingAsksCount(schedulerKey) == 0) { + // Release + result = new ContainerAllocation(reservedContainer, null, + AllocationState.QUEUE_SKIPPED); + continue; + } + } + result = tryAllocateOnNode(clusterResource, node, schedulingMode, resourceLimits, schedulerKey, reservedContainer); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/ApplicationSchedulingConfig.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/ApplicationSchedulingConfig.java index 1bd37431c15..06f74de96bc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/ApplicationSchedulingConfig.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/ApplicationSchedulingConfig.java @@ -32,4 +32,8 @@ public class ApplicationSchedulingConfig { @InterfaceAudience.Private public static final Class DEFAULT_APPLICATION_PLACEMENT_TYPE_CLASS = LocalityAppPlacementAllocator.class; + + @InterfaceAudience.Private + public static final String ENV_MULTI_NODE_SORTING_POLICY_CLASS = + "MULTI_NODE_SORTING_POLICY_CLASS"; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java index 6a5af814ed4..4bfdae9e728 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java @@ -75,6 +75,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCap import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator.AbstractContainerAllocator; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator.ContainerAllocator; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ApplicationSchedulingConfig; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerAllocationProposal; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk; @@ -170,10 +171,32 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt { rc = scheduler.getResourceCalculator(); } + // Update multi-node sorting algorithm to scheduler envs + updateMultiNodeSortingPolicy(rmApp); + containerAllocator = new ContainerAllocator(this, rc, rmContext, activitiesManager); } + private void updateMultiNodeSortingPolicy(RMApp rmApp) { + if (rmApp == null) { + return; + } + + String queueName = null; + if (scheduler instanceof CapacityScheduler) { + queueName = getCSLeafQueue().getMultiNodeSortingPolicyName(); + } + + if (!appSchedulingInfo.getApplicationSchedulingEnvs().containsKey( + ApplicationSchedulingConfig.ENV_MULTI_NODE_SORTING_POLICY_CLASS) + && queueName != null) { + appSchedulingInfo.getApplicationSchedulingEnvs().put( + ApplicationSchedulingConfig.ENV_MULTI_NODE_SORTING_POLICY_CLASS, + queueName); + } + } + public boolean containerCompleted(RMContainer rmContainer, ContainerStatus containerStatus, RMContainerEventType event, String partition) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java index f1df3432a86..9d30e9065ee 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java @@ -24,11 +24,14 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.SchedulingRequest; import org.apache.hadoop.yarn.exceptions.SchedulerInvalidResoureRequestException; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ApplicationSchedulingConfig; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk; import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; @@ -55,6 +58,8 @@ public class LocalityAppPlacementAllocator new ConcurrentHashMap<>(); private volatile String primaryRequestedPartition = RMNodeLabelsManager.NO_LABEL; + private MultiNodeSortingManager multiNodeSortingManager = null; + private String multiNodeSortPolicyName; private final ReentrantReadWriteLock.ReadLock readLock; private final ReentrantReadWriteLock.WriteLock writeLock; @@ -65,6 +70,26 @@ public class LocalityAppPlacementAllocator writeLock = lock.writeLock(); } + @SuppressWarnings("unchecked") + @Override + public void initialize(AppSchedulingInfo appSchedulingInfo, + SchedulerRequestKey schedulerRequestKey, RMContext rmContext) { + super.initialize(appSchedulingInfo, schedulerRequestKey, rmContext); + multiNodeSortPolicyName = appSchedulingInfo + .getApplicationSchedulingEnvs().get( + ApplicationSchedulingConfig.ENV_MULTI_NODE_SORTING_POLICY_CLASS); + multiNodeSortingManager = (MultiNodeSortingManager) rmContext + .getMultiNodeSortingManager(); + if (LOG.isDebugEnabled()) { + LOG.debug( + "nodeLookupPolicy used for " + appSchedulingInfo + .getApplicationId() + + " is " + ((multiNodeSortPolicyName != null) ? + multiNodeSortPolicyName : + "")); + } + } + @Override @SuppressWarnings("unchecked") public Iterator getPreferredNodeIterator( @@ -74,11 +99,16 @@ public class LocalityAppPlacementAllocator // in. N singleNode = CandidateNodeSetUtils.getSingleNode(candidateNodeSet); - if (null != singleNode) { + if (singleNode != null) { return IteratorUtils.singletonIterator(singleNode); } - return IteratorUtils.emptyIterator(); + // singleNode will be null if Multi-node placement lookup is enabled, and + // hence could consider sorting policies. + return multiNodeSortingManager.getMultiNodeSortIterator( + candidateNodeSet.getAllNodes().values(), + candidateNodeSet.getPartition(), + multiNodeSortPolicyName); } private boolean hasRequestLabelChanged(ResourceRequest requestOne, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodeLookupPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodeLookupPolicy.java new file mode 100644 index 00000000000..662e34d1dc6 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodeLookupPolicy.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement; + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; + +import java.util.Collection; +import java.util.Iterator; +import java.util.Set; + +/** + *

+ * This class has the following functionality. + * + *

+ * Provide an interface for MultiNodeLookupPolicy so that different placement + * allocator can choose nodes based on need. + *

+ */ +public interface MultiNodeLookupPolicy { + /** + * Get iterator of preferred node depends on requirement and/or availability. + * + * @param nodes + * List of Nodes + * @param partition + * node label + * + * @return iterator of preferred node + */ + Iterator getPreferredNodeIterator(Collection nodes, String partition); + + /** + * Refresh working nodes set for re-ordering based on the algorithm selected. + * + * @param nodes + * a collection working nm's. + */ + void addAndRefreshNodesSet(Collection nodes, String partition); + + /** + * Get sorted nodes per partition. + * + * @param partition + * node label + * + * @return collection of sorted nodes + */ + Set getNodesPerPartition(String partition); + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodePolicySpec.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodePolicySpec.java new file mode 100644 index 00000000000..8386d78c6d5 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodePolicySpec.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement; + +/** + * MultiNodePolicySpec contains policyName and timeout. + */ +public class MultiNodePolicySpec { + + private String policyName; + private long sortingInterval; + + public MultiNodePolicySpec(String policyName, long timeout) { + this.setSortingInterval(timeout); + this.setPolicyName(policyName); + } + + public long getSortingInterval() { + return sortingInterval; + } + + public void setSortingInterval(long timeout) { + this.sortingInterval = timeout; + } + + public String getPolicyName() { + return policyName; + } + + public void setPolicyName(String policyName) { + this.policyName = policyName; + } + + @Override + public String toString() { + return "MultiNodePolicySpec {" + + "policyName='" + policyName + '\'' + + ", sortingInterval=" + sortingInterval + + '}'; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodeSorter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodeSorter.java new file mode 100644 index 00000000000..7e27c34e268 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodeSorter.java @@ -0,0 +1,167 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement; + +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; + +import com.google.common.annotations.VisibleForTesting; + +/** + * Common node sorting class which will do sorting based on policy spec. + * @param extends SchedulerNode. + */ +public class MultiNodeSorter extends AbstractService { + + private MultiNodeLookupPolicy multiNodePolicy; + private static final Log LOG = LogFactory.getLog(MultiNodeSorter.class); + + // ScheduledExecutorService which schedules the PreemptionChecker to run + // periodically. + private ScheduledExecutorService ses; + private ScheduledFuture handler; + private volatile boolean stopped; + private RMContext rmContext; + private MultiNodePolicySpec policySpec; + + public MultiNodeSorter(RMContext rmContext, + MultiNodePolicySpec policy) { + super("MultiNodeLookupPolicy"); + this.rmContext = rmContext; + this.policySpec = policy; + } + + @VisibleForTesting + public synchronized MultiNodeLookupPolicy getMultiNodeLookupPolicy() { + return multiNodePolicy; + } + + public void serviceInit(Configuration conf) throws Exception { + LOG.info("Initializing MultiNodeSorter=" + policySpec.getPolicyName() + + ", with sorting interval=" + policySpec.getSortingInterval()); + initPolicy(policySpec.getPolicyName()); + super.serviceInit(conf); + } + + @SuppressWarnings("unchecked") + void initPolicy(String policyName) throws YarnException { + Class policyClass; + try { + policyClass = Class.forName(policyName); + } catch (ClassNotFoundException e) { + throw new YarnException( + "Invalid policy name:" + policyName + e.getMessage()); + } + this.multiNodePolicy = (MultiNodeLookupPolicy) ReflectionUtils + .newInstance(policyClass, null); + } + + @Override + public void serviceStart() throws Exception { + LOG.info("Starting SchedulingMonitor=" + getName()); + assert !stopped : "starting when already stopped"; + ses = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() { + public Thread newThread(Runnable r) { + Thread t = new Thread(r); + t.setName(getName()); + return t; + } + }); + + // Start sorter thread only if sorting interval is a +ve value. + if(policySpec.getSortingInterval() != 0) { + handler = ses.scheduleAtFixedRate(new SortingThread(), + 0, policySpec.getSortingInterval(), TimeUnit.MILLISECONDS); + } + super.serviceStart(); + } + + @Override + public void serviceStop() throws Exception { + stopped = true; + if (handler != null) { + LOG.info("Stop " + getName()); + handler.cancel(true); + ses.shutdown(); + } + super.serviceStop(); + } + + @SuppressWarnings("unchecked") + @VisibleForTesting + public void reSortClusterNodes() { + Set nodeLabels = new HashSet<>(); + nodeLabels + .addAll(rmContext.getNodeLabelManager().getClusterNodeLabelNames()); + nodeLabels.add(RMNodeLabelsManager.NO_LABEL); + for (String label : nodeLabels) { + Map nodesByPartition = new HashMap<>(); + List nodes = ((AbstractYarnScheduler) rmContext + .getScheduler()).getNodeTracker().getNodesPerPartition(label); + if (nodes != null && !nodes.isEmpty()) { + nodes.forEach(n -> nodesByPartition.put(n.getNodeID(), n)); + multiNodePolicy.addAndRefreshNodesSet( + (Collection) nodesByPartition.values(), label); + } + } + } + + private class SortingThread implements Runnable { + @Override + public void run() { + try { + reSortClusterNodes(); + } catch (Throwable t) { + // The preemption monitor does not alter structures nor do structures + // persist across invocations. Therefore, log, skip, and retry. + LOG.error("Exception raised while executing multinode" + + " sorter, skip this run..., exception=", t); + } + } + } + + /** + * Verify whether sorter thread is running or not. + * + * @return true if sorter thread is running, false otherwise. + */ + public boolean isSorterThreadRunning() { + return (handler != null); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodeSortingManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodeSortingManager.java new file mode 100644 index 00000000000..e872317f715 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodeSortingManager.java @@ -0,0 +1,139 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement; + +import java.util.Collection; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.commons.collections.IteratorUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; + +/** + * Node Sorting Manager which runs all sorter threads and policies. + * @param extends SchedulerNode + */ +public class MultiNodeSortingManager + extends AbstractService { + + private static final Log LOG = LogFactory + .getLog(MultiNodeSortingManager.class); + + private RMContext rmContext; + private Map> runningMultiNodeSorters; + private Set policySpecs = new HashSet(); + private Configuration conf; + private boolean multiNodePlacementEnabled; + + public MultiNodeSortingManager() { + super("MultiNodeSortingManager"); + this.runningMultiNodeSorters = new ConcurrentHashMap<>(); + } + + @Override + public void serviceInit(Configuration configuration) throws Exception { + LOG.info("Initializing NodeSortingService=" + getName()); + super.serviceInit(configuration); + this.conf = configuration; + } + + @Override + public void serviceStart() throws Exception { + LOG.info("Starting NodeSortingService=" + getName()); + createAllPolicies(); + super.serviceStart(); + } + + @Override + public void serviceStop() throws Exception { + for (MultiNodeSorter sorter : runningMultiNodeSorters.values()) { + sorter.stop(); + } + super.serviceStop(); + } + + private void createAllPolicies() { + if (!multiNodePlacementEnabled) { + return; + } + for (MultiNodePolicySpec policy : policySpecs) { + MultiNodeSorter mon = new MultiNodeSorter(rmContext, policy); + mon.init(conf); + mon.start(); + runningMultiNodeSorters.put(policy.getPolicyName(), mon); + } + } + + public MultiNodeSorter getMultiNodePolicy(String name) { + return runningMultiNodeSorters.get(name); + } + + public void setRMContext(RMContext context) { + this.rmContext = context; + } + + public void registerMultiNodePolicyNames( + boolean isMultiNodePlacementEnabled, + Set multiNodePlacementPolicies) { + this.policySpecs.addAll(multiNodePlacementPolicies); + this.multiNodePlacementEnabled = isMultiNodePlacementEnabled; + LOG.info("MultiNode scheduling is '" + multiNodePlacementEnabled + + "', and configured policies are " + StringUtils + .join(policySpecs.iterator(), ",")); + } + + public Iterator getMultiNodeSortIterator(Collection nodes, + String partition, String policyName) { + // nodeLookupPolicy can be null if app is configured with invalid policy. + // in such cases, use the the first node. + if(policyName == null) { + LOG.warn("Multi Node scheduling is enabled, however invalid class is" + + " configured. Valid sorting policy has to be configured in" + + " yarn.scheduler.capacity..multi-node-sorting.policy"); + return IteratorUtils.singletonIterator( + nodes.iterator().next()); + } + + MultiNodeSorter multiNodeSorter = getMultiNodePolicy(policyName); + if (multiNodeSorter == null) { + LOG.warn( + "MultiNode policy '" + policyName + "' is configured, however " + + "yarn.scheduler.capacity.multi-node-placement-enabled is false"); + return IteratorUtils.singletonIterator( + nodes.iterator().next()); + } + + MultiNodeLookupPolicy policy = multiNodeSorter + .getMultiNodeLookupPolicy(); + // If sorter thread is not running, refresh node set. + if (!multiNodeSorter.isSorterThreadRunning()) { + policy.addAndRefreshNodesSet(nodes, partition); + } + + return policy.getPreferredNodeIterator(nodes, partition); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/ResourceUsageMultiNodeLookupPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/ResourceUsageMultiNodeLookupPolicy.java new file mode 100644 index 00000000000..d765af8d136 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/ResourceUsageMultiNodeLookupPolicy.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement; + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; + +import java.util.Comparator; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentSkipListSet; + +/** + *

+ * This class has the following functionality: + * + *

+ * ResourceUsageMultiNodeLookupPolicy holds sorted nodes list based on the + * resource usage of nodes at given time. + *

+ */ +public class ResourceUsageMultiNodeLookupPolicy + implements MultiNodeLookupPolicy { + + protected Map> nodesPerPartition = new ConcurrentHashMap<>(); + protected Comparator comparator; + + public ResourceUsageMultiNodeLookupPolicy() { + this.comparator = new Comparator() { + @Override + public int compare(N o1, N o2) { + int allocatedDiff = o1.getAllocatedResource() + .compareTo(o2.getAllocatedResource()); + if (allocatedDiff == 0) { + return o1.getNodeID().compareTo(o2.getNodeID()); + } + return allocatedDiff; + } + }; + } + + @Override + public Iterator getPreferredNodeIterator(Collection nodes, + String partition) { + return getNodesPerPartition(partition).iterator(); + } + + @Override + public void addAndRefreshNodesSet(Collection nodes, + String partition) { + Set nodeList = new ConcurrentSkipListSet(comparator); + nodeList.addAll(nodes); + nodesPerPartition.put(partition, Collections.unmodifiableSet(nodeList)); + } + + @Override + public Set getNodesPerPartition(String partition) { + return nodesPerPartition.getOrDefault(partition, Collections.emptySet()); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java index eef86a44990..09d3327263b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java @@ -56,6 +56,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.Capacity import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.MultiNodeSortingManager; import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; @@ -295,6 +296,8 @@ public class ReservationSystemTestUtil { }); mockRmContext.setNodeLabelManager(nlm); + mockRmContext + .setMultiNodeSortingManager(mock(MultiNodeSortingManager.class)); return mockRmContext; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java index b7b0eb707bb..df8309b7f93 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java @@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSLeafQueue; import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; @@ -118,7 +119,7 @@ public class TestAppSchedulingInfo { doReturn(mock(QueueMetrics.class)).when(queue).getMetrics(); AppSchedulingInfo info = new AppSchedulingInfo( appAttemptId, "test", queue, mock(ActiveUsersManager.class), 0, - new ResourceUsage(), new HashMap<>(), null); + new ResourceUsage(), new HashMap<>(), mock(RMContext.class)); Assert.assertEquals(0, info.getSchedulerKeys().size()); Priority pri1 = Priority.newInstance(1); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerTestBase.java index 5cea3a2f1ab..60e25ed83ac 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerTestBase.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerTestBase.java @@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; import com.google.common.collect.Sets; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.junit.Assert; import java.util.Set; @@ -76,4 +77,16 @@ public class CapacitySchedulerTestBase { .getPending(label == null ? RMNodeLabelsManager.NO_LABEL : label) .getMemorySize() > 0); } + + protected void waitforNMRegistered(ResourceScheduler scheduler, int nodecount, + int timesec) throws InterruptedException { + long start = System.currentTimeMillis(); + while (System.currentTimeMillis() - start < timesec * 1000) { + if (scheduler.getNumClusterNodes() < nodecount) { + Thread.sleep(100); + } else { + break; + } + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index 8d948b57ba4..e77d8e21264 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -106,8 +106,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MyConta import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; -import org.apache.hadoop.yarn.server.resourcemanager.placement - .UserGroupMappingPlacementRule; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics; @@ -172,7 +170,6 @@ import org.mockito.Mockito; import com.google.common.base.Supplier; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Sets; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -4871,18 +4868,6 @@ public class TestCapacityScheduler extends CapacitySchedulerTestBase { return cs; } - private void waitforNMRegistered(ResourceScheduler scheduler, int nodecount, - int timesec) throws InterruptedException { - long start = System.currentTimeMillis(); - while (System.currentTimeMillis() - start < timesec * 1000) { - if (scheduler.getNumClusterNodes() < nodecount) { - Thread.sleep(100); - } else { - break; - } - } - } - @Test (timeout = 60000) public void testClearRequestsBeforeApplyTheProposal() throws Exception { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerMultiNodes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerMultiNodes.java new file mode 100644 index 00000000000..c90af94a169 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerMultiNodes.java @@ -0,0 +1,166 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.MockAM; +import org.apache.hadoop.yarn.server.resourcemanager.MockNM; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.MultiNodeSorter; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.MultiNodeSortingManager; +import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +/** + * Test class for Multi Node scheduling related tests. + */ +public class TestCapacitySchedulerMultiNodes extends CapacitySchedulerTestBase { + + private static final Log LOG = LogFactory + .getLog(TestCapacitySchedulerMultiNodes.class); + private CapacitySchedulerConfiguration conf; + private static final String POLICY_CLASS_NAME = + "org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.ResourceUsageMultiNodeLookupPolicy"; + + @Before + public void setUp() { + CapacitySchedulerConfiguration config = + new CapacitySchedulerConfiguration(); + config.set(CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS, + DominantResourceCalculator.class.getName()); + conf = new CapacitySchedulerConfiguration(config); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + conf.set(CapacitySchedulerConfiguration.MULTI_NODE_SORTING_POLICIES, + "resource-based"); + conf.set(CapacitySchedulerConfiguration.MULTI_NODE_SORTING_POLICY_NAME, + "resource-based"); + String policyName = + CapacitySchedulerConfiguration.MULTI_NODE_SORTING_POLICY_NAME + + ".resource-based" + ".class"; + conf.set(policyName, POLICY_CLASS_NAME); + conf.setBoolean(CapacitySchedulerConfiguration.MULTI_NODE_PLACEMENT_ENABLED, + true); + conf.setInt("yarn.scheduler.minimum-allocation-mb", 512); + conf.setInt("yarn.scheduler.minimum-allocation-vcores", 1); + } + + @Test + public void testMultiNodeSorterForScheduling() throws Exception { + MockRM rm = new MockRM(conf); + rm.start(); + rm.registerNode("127.0.0.1:1234", 10 * GB); + rm.registerNode("127.0.0.1:1235", 10 * GB); + rm.registerNode("127.0.0.1:1236", 10 * GB); + rm.registerNode("127.0.0.1:1237", 10 * GB); + ResourceScheduler scheduler = rm.getRMContext().getScheduler(); + waitforNMRegistered(scheduler, 4, 5); + MultiNodeSortingManager mns = rm.getRMContext() + .getMultiNodeSortingManager(); + MultiNodeSorter sorter = mns + .getMultiNodePolicy(POLICY_CLASS_NAME); + sorter.reSortClusterNodes(); + Set nodes = sorter.getMultiNodeLookupPolicy() + .getNodesPerPartition(""); + Assert.assertEquals(4, nodes.size()); + rm.stop(); + } + + @Test + public void testMultiNodeSorterForSchedulingWithOrdering() throws Exception { + MockRM rm = new MockRM(conf); + rm.start(); + MockNM nm1 = rm.registerNode("127.0.0.1:1234", 10 * GB, 10); + MockNM nm2 = rm.registerNode("127.0.0.2:1235", 10 * GB, 10); + MockNM nm3 = rm.registerNode("127.0.0.3:1236", 10 * GB, 10); + MockNM nm4 = rm.registerNode("127.0.0.4:1237", 10 * GB, 10); + ResourceScheduler scheduler = rm.getRMContext().getScheduler(); + waitforNMRegistered(scheduler, 4, 5); + + MultiNodeSortingManager mns = rm.getRMContext() + .getMultiNodeSortingManager(); + MultiNodeSorter sorter = mns + .getMultiNodePolicy(POLICY_CLASS_NAME); + sorter.reSortClusterNodes(); + + Set nodes = sorter.getMultiNodeLookupPolicy() + .getNodesPerPartition(""); + Assert.assertEquals(4, nodes.size()); + + RMApp app1 = rm.submitApp(2048, "app-1", "user1", null, "default"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1); + SchedulerNodeReport reportNm1 = + rm.getResourceScheduler().getNodeReport(nm1.getNodeId()); + + // check node report + Assert.assertEquals(2 * GB, reportNm1.getUsedResource().getMemorySize()); + Assert.assertEquals(8 * GB, + reportNm1.getAvailableResource().getMemorySize()); + + // Ideally thread will invoke this, but thread operates every 1sec. + // Hence forcefully recompute nodes. + sorter.reSortClusterNodes(); + + RMApp app2 = rm.submitApp(1024, "app-2", "user2", null, "default"); + MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm2); + SchedulerNodeReport reportNm2 = + rm.getResourceScheduler().getNodeReport(nm2.getNodeId()); + + // check node report + Assert.assertEquals(1 * GB, reportNm2.getUsedResource().getMemorySize()); + Assert.assertEquals(9 * GB, + reportNm2.getAvailableResource().getMemorySize()); + + // Ideally thread will invoke this, but thread operates every 1sec. + // Hence forcefully recompute nodes. + sorter.reSortClusterNodes(); + + // Node1 and Node2 are now having used resources. Hence ensure these 2 comes + // latter in the list. + nodes = sorter.getMultiNodeLookupPolicy() + .getNodesPerPartition(""); + List currentNodes = new ArrayList<>(); + currentNodes.add(nm3.getNodeId()); + currentNodes.add(nm4.getNodeId()); + currentNodes.add(nm2.getNodeId()); + currentNodes.add(nm1.getNodeId()); + Iterator it = nodes.iterator(); + SchedulerNode current; + int i = 0; + while (it.hasNext()) { + current = it.next(); + Assert.assertEquals(current.getNodeID(), currentNodes.get(i++)); + } + rm.stop(); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java index b4ebd15ccde..e239191e5e3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java @@ -817,4 +817,74 @@ public class TestCapacitySchedulerNodeLabelUpdate { } return memorySize; } + + private long waitForNodeLabelSchedulerEventUpdate(MockRM rm, String partition, + long expectedNodeCount, long timeout) throws InterruptedException { + long start = System.currentTimeMillis(); + long size = 0; + while (System.currentTimeMillis() - start < timeout) { + CapacityScheduler scheduler = (CapacityScheduler) rm + .getResourceScheduler(); + size = scheduler.getNodeTracker().getNodesPerPartition(partition).size(); + if (size == expectedNodeCount) { + return size; + } + Thread.sleep(100); + } + return size; + } + + @Test + public void testNodeCountBasedOnNodeLabelsFromClusterNodeTracker() + throws Exception { + // set node -> label + mgr.addToCluserNodeLabelsWithDefaultExclusivity( + ImmutableSet.of("x", "y", "z")); + + // set mapping: + // h1 -> x + // h2 -> y + mgr.addLabelsToNode( + ImmutableMap.of(NodeId.newInstance("h1", 1234), toSet("x"))); + mgr.addLabelsToNode( + ImmutableMap.of(NodeId.newInstance("h2", 1234), toSet("x"))); + + // inject node label manager + MockRM rm = new MockRM(getConfigurationWithQueueLabels(conf)) { + @Override + public RMNodeLabelsManager createNodeLabelManager() { + return mgr; + } + }; + + rm.getRMContext().setNodeLabelManager(mgr); + rm.start(); + MockNM nm1 = rm.registerNode("h1:1234", 8000); + rm.registerNode("h2:1234", 8000); + rm.registerNode("h3:1234", 8000); + + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + + // Ensure that cluster node tracker is updated with correct set of node + // after Node registration. + Assert.assertEquals(2, + cs.getNodeTracker().getNodesPerPartition("x").size()); + Assert.assertEquals(1, cs.getNodeTracker().getNodesPerPartition("").size()); + + rm.unRegisterNode(nm1); + rm.registerNode("h4:1234", 8000); + + // Ensure that cluster node tracker is updated with correct set of node + // after new Node registration and old node label change. + Assert.assertEquals(1, + cs.getNodeTracker().getNodesPerPartition("x").size()); + Assert.assertEquals(2, cs.getNodeTracker().getNodesPerPartition("").size()); + + mgr.replaceLabelsOnNode( + ImmutableMap.of(NodeId.newInstance("h2", 1234), toSet(""))); + + // Last node with label x is replaced by CLI or REST. + Assert.assertEquals(0, + waitForNodeLabelSchedulerEventUpdate(rm, "x", 0, 3000L)); + } }