YARN-7494. Add muti-node lookup mechanism and pluggable nodes sorting policies to optimize placement decision. Contributed by Sunil Govindan.

This commit is contained in:
Weiwei Yang 2018-08-21 22:42:23 +08:00
parent 54d0bf8935
commit 9c3fc3ef28
29 changed files with 1210 additions and 92 deletions

View File

@ -43,9 +43,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.monitor.RMAppLifetime
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTagsManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTagsManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.PlacementConstraintManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.PlacementConstraintManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.QueueLimitCalculator; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.QueueLimitCalculator;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.MultiNodeSortingManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer; import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
@ -113,6 +115,7 @@ public class RMActiveServiceContext {
private AllocationTagsManager allocationTagsManager; private AllocationTagsManager allocationTagsManager;
private PlacementConstraintManager placementConstraintManager; private PlacementConstraintManager placementConstraintManager;
private ResourceProfilesManager resourceProfilesManager; private ResourceProfilesManager resourceProfilesManager;
private MultiNodeSortingManager<SchedulerNode> multiNodeSortingManager;
public RMActiveServiceContext() { public RMActiveServiceContext() {
queuePlacementManager = new PlacementManager(); queuePlacementManager = new PlacementManager();
@ -441,6 +444,19 @@ public class RMActiveServiceContext {
rmDelegatedNodeLabelsUpdater = nodeLablesUpdater; rmDelegatedNodeLabelsUpdater = nodeLablesUpdater;
} }
@Private
@Unstable
public MultiNodeSortingManager<SchedulerNode> getMultiNodeSortingManager() {
return multiNodeSortingManager;
}
@Private
@Unstable
public void setMultiNodeSortingManager(
MultiNodeSortingManager<SchedulerNode> multiNodeSortingManager) {
this.multiNodeSortingManager = multiNodeSortingManager;
}
@Private @Private
@Unstable @Unstable
public void setSchedulerRecoveryStartAndWaitTime(long waitTime) { public void setSchedulerRecoveryStartAndWaitTime(long waitTime) {

View File

@ -42,10 +42,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.monitor.RMAppLifetime
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.PlacementConstraintManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.PlacementConstraintManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTagsManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTagsManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.QueueLimitCalculator; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.QueueLimitCalculator;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.MultiNodeSortingManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer; import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
@ -177,4 +178,9 @@ public interface RMContext extends ApplicationMasterServiceContext {
void setPlacementConstraintManager( void setPlacementConstraintManager(
PlacementConstraintManager placementConstraintManager); PlacementConstraintManager placementConstraintManager);
MultiNodeSortingManager<SchedulerNode> getMultiNodeSortingManager();
void setMultiNodeSortingManager(
MultiNodeSortingManager<SchedulerNode> multiNodeSortingManager);
} }

View File

@ -48,10 +48,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.monitor.RMAppLifetime
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTagsManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTagsManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.PlacementConstraintManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.PlacementConstraintManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.QueueLimitCalculator; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.QueueLimitCalculator;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.MultiNodeSortingManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer; import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
@ -538,6 +539,17 @@ public class RMContextImpl implements RMContext {
delegatedNodeLabelsUpdater); delegatedNodeLabelsUpdater);
} }
@Override
public MultiNodeSortingManager<SchedulerNode> getMultiNodeSortingManager() {
return activeServiceContext.getMultiNodeSortingManager();
}
@Override
public void setMultiNodeSortingManager(
MultiNodeSortingManager<SchedulerNode> multiNodeSortingManager) {
activeServiceContext.setMultiNodeSortingManager(multiNodeSortingManager);
}
public void setSchedulerRecoveryStartAndWaitTime(long waitTime) { public void setSchedulerRecoveryStartAndWaitTime(long waitTime) {
activeServiceContext.setSchedulerRecoveryStartAndWaitTime(waitTime); activeServiceContext.setSchedulerRecoveryStartAndWaitTime(waitTime);
} }

View File

@ -96,11 +96,13 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTagsManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTagsManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.MemoryPlacementConstraintManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.MemoryPlacementConstraintManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.PlacementConstraintManagerService; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.PlacementConstraintManagerService;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.MultiNodeSortingManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer; import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
import org.apache.hadoop.yarn.server.resourcemanager.security.QueueACLsManager; import org.apache.hadoop.yarn.server.resourcemanager.security.QueueACLsManager;
import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollectorManager; import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollectorManager;
@ -546,6 +548,10 @@ public class ResourceManager extends CompositeService
return new FederationStateStoreService(rmContext); return new FederationStateStoreService(rmContext);
} }
protected MultiNodeSortingManager<SchedulerNode> createMultiNodeSortingManager() {
return new MultiNodeSortingManager<SchedulerNode>();
}
protected SystemMetricsPublisher createSystemMetricsPublisher() { protected SystemMetricsPublisher createSystemMetricsPublisher() {
List<SystemMetricsPublisher> publishers = List<SystemMetricsPublisher> publishers =
new ArrayList<SystemMetricsPublisher>(); new ArrayList<SystemMetricsPublisher>();
@ -665,6 +671,12 @@ public class ResourceManager extends CompositeService
resourceProfilesManager.init(conf); resourceProfilesManager.init(conf);
rmContext.setResourceProfilesManager(resourceProfilesManager); rmContext.setResourceProfilesManager(resourceProfilesManager);
MultiNodeSortingManager<SchedulerNode> multiNodeSortingManager =
createMultiNodeSortingManager();
multiNodeSortingManager.setRMContext(rmContext);
addService(multiNodeSortingManager);
rmContext.setMultiNodeSortingManager(multiNodeSortingManager);
RMDelegatedNodeLabelsUpdater delegatedNodeLabelsUpdater = RMDelegatedNodeLabelsUpdater delegatedNodeLabelsUpdater =
createRMDelegatedNodeLabelsUpdater(); createRMDelegatedNodeLabelsUpdater();
if (delegatedNodeLabelsUpdater != null) { if (delegatedNodeLabelsUpdater != null) {

View File

@ -93,7 +93,7 @@ public class AppSchedulingInfo {
private final ReentrantReadWriteLock.WriteLock writeLock; private final ReentrantReadWriteLock.WriteLock writeLock;
public final ContainerUpdateContext updateContext; public final ContainerUpdateContext updateContext;
public final Map<String, String> applicationSchedulingEnvs = new HashMap<>(); private final Map<String, String> applicationSchedulingEnvs = new HashMap<>();
private final RMContext rmContext; private final RMContext rmContext;
public AppSchedulingInfo(ApplicationAttemptId appAttemptId, String user, public AppSchedulingInfo(ApplicationAttemptId appAttemptId, String user,
@ -782,4 +782,13 @@ public class AppSchedulingInfo {
this.readLock.unlock(); this.readLock.unlock();
} }
} }
/**
* Get scheduling envs configured for this application.
*
* @return a map of applicationSchedulingEnvs
*/
public Map<String, String> getApplicationSchedulingEnvs() {
return applicationSchedulingEnvs;
}
} }

View File

@ -37,6 +37,7 @@ import java.util.Comparator;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
@ -57,6 +58,7 @@ public class ClusterNodeTracker<N extends SchedulerNode> {
private HashMap<NodeId, N> nodes = new HashMap<>(); private HashMap<NodeId, N> nodes = new HashMap<>();
private Map<String, N> nodeNameToNodeMap = new HashMap<>(); private Map<String, N> nodeNameToNodeMap = new HashMap<>();
private Map<String, List<N>> nodesPerRack = new HashMap<>(); private Map<String, List<N>> nodesPerRack = new HashMap<>();
private Map<String, List<N>> nodesPerLabel = new HashMap<>();
private Resource clusterCapacity = Resources.createResource(0, 0); private Resource clusterCapacity = Resources.createResource(0, 0);
private volatile Resource staleClusterCapacity = private volatile Resource staleClusterCapacity =
@ -80,6 +82,16 @@ public class ClusterNodeTracker<N extends SchedulerNode> {
nodes.put(node.getNodeID(), node); nodes.put(node.getNodeID(), node);
nodeNameToNodeMap.put(node.getNodeName(), node); nodeNameToNodeMap.put(node.getNodeName(), node);
List<N> nodesPerLabels = nodesPerLabel.get(node.getPartition());
if (nodesPerLabels == null) {
nodesPerLabels = new ArrayList<N>();
}
nodesPerLabels.add(node);
// Update new set of nodes for given partition.
nodesPerLabel.put(node.getPartition(), nodesPerLabels);
// Update nodes per rack as well // Update nodes per rack as well
String rackName = node.getRackName(); String rackName = node.getRackName();
List<N> nodesList = nodesPerRack.get(rackName); List<N> nodesList = nodesPerRack.get(rackName);
@ -174,6 +186,16 @@ public class ClusterNodeTracker<N extends SchedulerNode> {
} }
} }
List<N> nodesPerPartition = nodesPerLabel.get(node.getPartition());
nodesPerPartition.remove(node);
// Update new set of nodes for given partition.
if (nodesPerPartition.isEmpty()) {
nodesPerLabel.remove(node.getPartition());
} else {
nodesPerLabel.put(node.getPartition(), nodesPerPartition);
}
// Update cluster capacity // Update cluster capacity
Resources.subtractFrom(clusterCapacity, node.getTotalResource()); Resources.subtractFrom(clusterCapacity, node.getTotalResource());
staleClusterCapacity = Resources.clone(clusterCapacity); staleClusterCapacity = Resources.clone(clusterCapacity);
@ -420,4 +442,43 @@ public class ClusterNodeTracker<N extends SchedulerNode> {
} }
return retNodes; return retNodes;
} }
/**
* update cached nodes per partition on a node label change event.
* @param partition nodeLabel
* @param nodeIds List of Node IDs
*/
public void updateNodesPerPartition(String partition, Set<NodeId> nodeIds) {
writeLock.lock();
try {
// Clear all entries.
nodesPerLabel.remove(partition);
List<N> nodesPerPartition = new ArrayList<N>();
for (NodeId nodeId : nodeIds) {
N n = getNode(nodeId);
if (n != null) {
nodesPerPartition.add(n);
}
}
// Update new set of nodes for given partition.
nodesPerLabel.put(partition, nodesPerPartition);
} finally {
writeLock.unlock();
}
}
public List<N> getNodesPerPartition(String partition) {
List<N> nodesPerPartition = null;
readLock.lock();
try {
if (nodesPerLabel.containsKey(partition)) {
nodesPerPartition = new ArrayList<N>(nodesPerLabel.get(partition));
}
} finally {
readLock.unlock();
}
return nodesPerPartition;
}
} }

View File

@ -21,13 +21,13 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
/** /**
* Utility for logging scheduler activities * Utility for logging scheduler activities
@ -63,7 +63,7 @@ public class ActivitiesLogger {
SchedulerApplicationAttempt application, Priority priority, SchedulerApplicationAttempt application, Priority priority,
String diagnostic) { String diagnostic) {
String type = "app"; String type = "app";
if (activitiesManager == null) { if (node == null || activitiesManager == null) {
return; return;
} }
if (activitiesManager.shouldRecordThisNode(node.getNodeID())) { if (activitiesManager.shouldRecordThisNode(node.getNodeID())) {
@ -84,18 +84,18 @@ public class ActivitiesLogger {
ActivitiesManager activitiesManager, SchedulerNode node, ActivitiesManager activitiesManager, SchedulerNode node,
SchedulerApplicationAttempt application, Priority priority, SchedulerApplicationAttempt application, Priority priority,
String diagnostic, ActivityState appState) { String diagnostic, ActivityState appState) {
if (activitiesManager == null) { if (node == null || activitiesManager == null) {
return; return;
} }
if (activitiesManager.shouldRecordThisNode(node.getNodeID())) { if (activitiesManager.shouldRecordThisNode(node.getNodeID())) {
String type = "container"; String type = "container";
// Add application-container activity into specific node allocation. // Add application-container activity into specific node allocation.
activitiesManager.addSchedulingActivityForNode(node.getNodeID(), activitiesManager.addSchedulingActivityForNode(node,
application.getApplicationId().toString(), null, application.getApplicationId().toString(), null,
priority.toString(), ActivityState.SKIPPED, diagnostic, type); priority.toString(), ActivityState.SKIPPED, diagnostic, type);
type = "app"; type = "app";
// Add queue-application activity into specific node allocation. // Add queue-application activity into specific node allocation.
activitiesManager.addSchedulingActivityForNode(node.getNodeID(), activitiesManager.addSchedulingActivityForNode(node,
application.getQueueName(), application.getQueueName(),
application.getApplicationId().toString(), application.getApplicationId().toString(),
application.getPriority().toString(), ActivityState.SKIPPED, application.getPriority().toString(), ActivityState.SKIPPED,
@ -121,20 +121,20 @@ public class ActivitiesLogger {
ActivitiesManager activitiesManager, SchedulerNode node, ActivitiesManager activitiesManager, SchedulerNode node,
SchedulerApplicationAttempt application, RMContainer updatedContainer, SchedulerApplicationAttempt application, RMContainer updatedContainer,
ActivityState activityState) { ActivityState activityState) {
if (activitiesManager == null) { if (node == null || activitiesManager == null) {
return; return;
} }
if (activitiesManager.shouldRecordThisNode(node.getNodeID())) { if (activitiesManager.shouldRecordThisNode(node.getNodeID())) {
String type = "container"; String type = "container";
// Add application-container activity into specific node allocation. // Add application-container activity into specific node allocation.
activitiesManager.addSchedulingActivityForNode(node.getNodeID(), activitiesManager.addSchedulingActivityForNode(node,
application.getApplicationId().toString(), application.getApplicationId().toString(),
updatedContainer.getContainer().toString(), updatedContainer.getContainer().toString(),
updatedContainer.getContainer().getPriority().toString(), updatedContainer.getContainer().getPriority().toString(),
activityState, ActivityDiagnosticConstant.EMPTY, type); activityState, ActivityDiagnosticConstant.EMPTY, type);
type = "app"; type = "app";
// Add queue-application activity into specific node allocation. // Add queue-application activity into specific node allocation.
activitiesManager.addSchedulingActivityForNode(node.getNodeID(), activitiesManager.addSchedulingActivityForNode(node,
application.getQueueName(), application.getQueueName(),
application.getApplicationId().toString(), application.getApplicationId().toString(),
application.getPriority().toString(), ActivityState.ACCEPTED, application.getPriority().toString(), ActivityState.ACCEPTED,
@ -157,13 +157,15 @@ public class ActivitiesLogger {
* update. * update.
*/ */
public static void startAppAllocationRecording( public static void startAppAllocationRecording(
ActivitiesManager activitiesManager, NodeId nodeId, long currentTime, ActivitiesManager activitiesManager, FiCaSchedulerNode node,
long currentTime,
SchedulerApplicationAttempt application) { SchedulerApplicationAttempt application) {
if (activitiesManager == null) { if (node == null || activitiesManager == null) {
return; return;
} }
activitiesManager.startAppAllocationRecording(nodeId, currentTime, activitiesManager
application); .startAppAllocationRecording(node.getNodeID(), currentTime,
application);
} }
/* /*
@ -208,7 +210,7 @@ public class ActivitiesLogger {
public static void recordQueueActivity(ActivitiesManager activitiesManager, public static void recordQueueActivity(ActivitiesManager activitiesManager,
SchedulerNode node, String parentQueueName, String queueName, SchedulerNode node, String parentQueueName, String queueName,
ActivityState state, String diagnostic) { ActivityState state, String diagnostic) {
if (activitiesManager == null) { if (node == null || activitiesManager == null) {
return; return;
} }
if (activitiesManager.shouldRecordThisNode(node.getNodeID())) { if (activitiesManager.shouldRecordThisNode(node.getNodeID())) {
@ -240,7 +242,7 @@ public class ActivitiesLogger {
public static void finishAllocatedNodeAllocation( public static void finishAllocatedNodeAllocation(
ActivitiesManager activitiesManager, SchedulerNode node, ActivitiesManager activitiesManager, SchedulerNode node,
ContainerId containerId, AllocationState containerState) { ContainerId containerId, AllocationState containerState) {
if (activitiesManager == null) { if (node == null || activitiesManager == null) {
return; return;
} }
if (activitiesManager.shouldRecordThisNode(node.getNodeID())) { if (activitiesManager.shouldRecordThisNode(node.getNodeID())) {
@ -277,7 +279,7 @@ public class ActivitiesLogger {
SchedulerNode node, String parentName, String childName, SchedulerNode node, String parentName, String childName,
Priority priority, ActivityState state, String diagnostic, String type) { Priority priority, ActivityState state, String diagnostic, String type) {
activitiesManager.addSchedulingActivityForNode(node.getNodeID(), parentName, activitiesManager.addSchedulingActivityForNode(node, parentName,
childName, priority != null ? priority.toString() : null, state, childName, priority != null ? priority.toString() : null, state,
diagnostic, type); diagnostic, type);

View File

@ -27,6 +27,7 @@ import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ActivitiesInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ActivitiesInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppActivitiesInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppActivitiesInfo;
import org.apache.hadoop.yarn.util.SystemClock; import org.apache.hadoop.yarn.util.SystemClock;
@ -197,11 +198,12 @@ public class ActivitiesManager extends AbstractService {
} }
// Add queue, application or container activity into specific node allocation. // Add queue, application or container activity into specific node allocation.
void addSchedulingActivityForNode(NodeId nodeID, String parentName, void addSchedulingActivityForNode(SchedulerNode node, String parentName,
String childName, String priority, ActivityState state, String diagnostic, String childName, String priority, ActivityState state, String diagnostic,
String type) { String type) {
if (shouldRecordThisNode(nodeID)) { if (shouldRecordThisNode(node.getNodeID())) {
NodeAllocation nodeAllocation = getCurrentNodeAllocation(nodeID); NodeAllocation nodeAllocation = getCurrentNodeAllocation(
node.getNodeID());
nodeAllocation.addAllocationActivity(parentName, childName, priority, nodeAllocation.addAllocationActivity(parentName, childName, priority,
state, diagnostic, type); state, diagnostic, type);
} }

View File

@ -92,6 +92,7 @@ public abstract class AbstractCSQueue implements CSQueue {
Set<String> resourceTypes; Set<String> resourceTypes;
final RMNodeLabelsManager labelManager; final RMNodeLabelsManager labelManager;
String defaultLabelExpression; String defaultLabelExpression;
private String multiNodeSortingPolicyName = null;
Map<AccessType, AccessControlList> acls = Map<AccessType, AccessControlList> acls =
new HashMap<AccessType, AccessControlList>(); new HashMap<AccessType, AccessControlList>();
@ -414,6 +415,10 @@ public abstract class AbstractCSQueue implements CSQueue {
this.priority = configuration.getQueuePriority( this.priority = configuration.getQueuePriority(
getQueuePath()); getQueuePath());
// Update multi-node sorting algorithm for scheduling as configured.
setMultiNodeSortingPolicyName(
configuration.getMultiNodesSortingAlgorithmPolicy(getQueuePath()));
this.userWeights = getUserWeightsFromHierarchy(configuration); this.userWeights = getUserWeightsFromHierarchy(configuration);
} finally { } finally {
writeLock.unlock(); writeLock.unlock();
@ -1259,4 +1264,13 @@ public abstract class AbstractCSQueue implements CSQueue {
this.writeLock.unlock(); this.writeLock.unlock();
} }
} }
@Override
public String getMultiNodeSortingPolicyName() {
return this.multiNodeSortingPolicyName;
}
public void setMultiNodeSortingPolicyName(String policyName) {
this.multiNodeSortingPolicyName = policyName;
}
} }

View File

@ -430,4 +430,10 @@ public interface CSQueue extends SchedulerQueue<CSQueue> {
* @return effective max queue capacity * @return effective max queue capacity
*/ */
Resource getEffectiveMaxCapacityDown(String label, Resource factor); Resource getEffectiveMaxCapacityDown(String label, Resource factor);
/**
* Get Multi Node scheduling policy name.
* @return policy name
*/
String getMultiNodeSortingPolicyName();
} }

View File

@ -22,6 +22,7 @@ import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -251,6 +252,7 @@ public class CapacityScheduler extends
private ResourceCommitterService resourceCommitterService; private ResourceCommitterService resourceCommitterService;
private RMNodeLabelsManager labelManager; private RMNodeLabelsManager labelManager;
private AppPriorityACLsManager appPriorityACLManager; private AppPriorityACLsManager appPriorityACLManager;
private boolean multiNodePlacementEnabled;
private static boolean printedVerboseLoggingForAsyncScheduling = false; private static boolean printedVerboseLoggingForAsyncScheduling = false;
@ -391,12 +393,23 @@ public class CapacityScheduler extends
// Setup how many containers we can allocate for each round // Setup how many containers we can allocate for each round
offswitchPerHeartbeatLimit = this.conf.getOffSwitchPerHeartbeatLimit(); offswitchPerHeartbeatLimit = this.conf.getOffSwitchPerHeartbeatLimit();
// Register CS specific multi-node policies to common MultiNodeManager
// which will add to a MultiNodeSorter which gives a pre-sorted list of
// nodes to scheduler's allocation.
multiNodePlacementEnabled = this.conf.getMultiNodePlacementEnabled();
if(rmContext.getMultiNodeSortingManager() != null) {
rmContext.getMultiNodeSortingManager().registerMultiNodePolicyNames(
multiNodePlacementEnabled,
this.conf.getMultiNodePlacementPolicies());
}
LOG.info("Initialized CapacityScheduler with " + "calculator=" LOG.info("Initialized CapacityScheduler with " + "calculator="
+ getResourceCalculator().getClass() + ", " + "minimumAllocation=<" + getResourceCalculator().getClass() + ", " + "minimumAllocation=<"
+ getMinimumResourceCapability() + ">, " + "maximumAllocation=<" + getMinimumResourceCapability() + ">, " + "maximumAllocation=<"
+ getMaximumResourceCapability() + ">, " + "asynchronousScheduling=" + getMaximumResourceCapability() + ">, " + "asynchronousScheduling="
+ scheduleAsynchronously + ", " + "asyncScheduleInterval=" + scheduleAsynchronously + ", " + "asyncScheduleInterval="
+ asyncScheduleInterval + "ms"); + asyncScheduleInterval + "ms" + ",multiNodePlacementEnabled="
+ multiNodePlacementEnabled);
} finally { } finally {
writeLock.unlock(); writeLock.unlock();
} }
@ -1373,18 +1386,23 @@ public class CapacityScheduler extends
assignment.getAssignmentInformation().getAllocationDetails(); assignment.getAssignmentInformation().getAllocationDetails();
List<AssignmentInformation.AssignmentDetails> reservations = List<AssignmentInformation.AssignmentDetails> reservations =
assignment.getAssignmentInformation().getReservationDetails(); assignment.getAssignmentInformation().getReservationDetails();
// Get nodeId from allocated container if incoming argument is null.
NodeId updatedNodeid = (nodeId == null)
? allocations.get(allocations.size() - 1).rmContainer.getNodeId()
: nodeId;
if (!allocations.isEmpty()) { if (!allocations.isEmpty()) {
ContainerId allocatedContainerId = ContainerId allocatedContainerId =
allocations.get(allocations.size() - 1).containerId; allocations.get(allocations.size() - 1).containerId;
String allocatedQueue = allocations.get(allocations.size() - 1).queue; String allocatedQueue = allocations.get(allocations.size() - 1).queue;
schedulerHealth.updateAllocation(now, nodeId, allocatedContainerId, schedulerHealth.updateAllocation(now, updatedNodeid, allocatedContainerId,
allocatedQueue); allocatedQueue);
} }
if (!reservations.isEmpty()) { if (!reservations.isEmpty()) {
ContainerId reservedContainerId = ContainerId reservedContainerId =
reservations.get(reservations.size() - 1).containerId; reservations.get(reservations.size() - 1).containerId;
String reservedQueue = reservations.get(reservations.size() - 1).queue; String reservedQueue = reservations.get(reservations.size() - 1).queue;
schedulerHealth.updateReservation(now, nodeId, reservedContainerId, schedulerHealth.updateReservation(now, updatedNodeid, reservedContainerId,
reservedQueue); reservedQueue);
} }
schedulerHealth.updateSchedulerReservationCounts(assignment schedulerHealth.updateSchedulerReservationCounts(assignment
@ -1421,6 +1439,23 @@ public class CapacityScheduler extends
|| assignedContainers < maxAssignPerHeartbeat); || assignedContainers < maxAssignPerHeartbeat);
} }
private CandidateNodeSet<FiCaSchedulerNode> getCandidateNodeSet(
FiCaSchedulerNode node) {
CandidateNodeSet<FiCaSchedulerNode> candidates = null;
candidates = new SimpleCandidateNodeSet<>(node);
if (multiNodePlacementEnabled) {
Map<NodeId, FiCaSchedulerNode> nodesByPartition = new HashMap<>();
List<FiCaSchedulerNode> nodes = nodeTracker
.getNodesPerPartition(node.getPartition());
if (nodes != null && !nodes.isEmpty()) {
nodes.forEach(n -> nodesByPartition.put(n.getNodeID(), n));
candidates = new SimpleCandidateNodeSet<FiCaSchedulerNode>(
nodesByPartition, node.getPartition());
}
}
return candidates;
}
/** /**
* We need to make sure when doing allocation, Node should be existed * We need to make sure when doing allocation, Node should be existed
* And we will construct a {@link CandidateNodeSet} before proceeding * And we will construct a {@link CandidateNodeSet} before proceeding
@ -1432,8 +1467,8 @@ public class CapacityScheduler extends
int offswitchCount = 0; int offswitchCount = 0;
int assignedContainers = 0; int assignedContainers = 0;
CandidateNodeSet<FiCaSchedulerNode> candidates = CandidateNodeSet<FiCaSchedulerNode> candidates = getCandidateNodeSet(
new SimpleCandidateNodeSet<>(node); node);
CSAssignment assignment = allocateContainersToNode(candidates, CSAssignment assignment = allocateContainersToNode(candidates,
withNodeHeartbeat); withNodeHeartbeat);
// Only check if we can allocate more container on the same node when // Only check if we can allocate more container on the same node when
@ -1599,10 +1634,13 @@ public class CapacityScheduler extends
if (Resources.greaterThan(calculator, getClusterResource(), if (Resources.greaterThan(calculator, getClusterResource(),
assignment.getResource(), Resources.none())) { assignment.getResource(), Resources.none())) {
FiCaSchedulerNode node = CandidateNodeSetUtils.getSingleNode(candidates);
NodeId nodeId = null;
if (node != null) {
nodeId = node.getNodeID();
}
if (withNodeHeartbeat) { if (withNodeHeartbeat) {
updateSchedulerHealth(lastNodeUpdateTime, updateSchedulerHealth(lastNodeUpdateTime, nodeId, assignment);
CandidateNodeSetUtils.getSingleNode(candidates).getNodeID(),
assignment);
} }
return assignment; return assignment;
} }
@ -1681,7 +1719,7 @@ public class CapacityScheduler extends
// We have two different logics to handle allocation on single node / multi // We have two different logics to handle allocation on single node / multi
// nodes. // nodes.
CSAssignment assignment; CSAssignment assignment;
if (null != node) { if (!multiNodePlacementEnabled) {
assignment = allocateContainerOnSingleNode(candidates, assignment = allocateContainerOnSingleNode(candidates,
node, withNodeHeartbeat); node, withNodeHeartbeat);
} else{ } else{
@ -1869,12 +1907,21 @@ public class CapacityScheduler extends
NodeLabelsUpdateSchedulerEvent labelUpdateEvent) { NodeLabelsUpdateSchedulerEvent labelUpdateEvent) {
try { try {
writeLock.lock(); writeLock.lock();
Set<String> updateLabels = new HashSet<String>();
for (Entry<NodeId, Set<String>> entry : labelUpdateEvent for (Entry<NodeId, Set<String>> entry : labelUpdateEvent
.getUpdatedNodeToLabels().entrySet()) { .getUpdatedNodeToLabels().entrySet()) {
NodeId id = entry.getKey(); NodeId id = entry.getKey();
Set<String> labels = entry.getValue(); Set<String> labels = entry.getValue();
FiCaSchedulerNode node = nodeTracker.getNode(id);
if (node != null) {
// Update old partition to list.
updateLabels.add(node.getPartition());
}
updateLabelsOnNode(id, labels); updateLabelsOnNode(id, labels);
updateLabels.addAll(labels);
} }
refreshLabelToNodeCache(updateLabels);
Resource clusterResource = getClusterResource(); Resource clusterResource = getClusterResource();
getRootQueue().updateClusterResource(clusterResource, getRootQueue().updateClusterResource(clusterResource,
new ResourceLimits(clusterResource)); new ResourceLimits(clusterResource));
@ -1883,6 +1930,18 @@ public class CapacityScheduler extends
} }
} }
private void refreshLabelToNodeCache(Set<String> updateLabels) {
Map<String, Set<NodeId>> labelMapping = labelManager
.getLabelsToNodes(updateLabels);
for (String label : updateLabels) {
Set<NodeId> nodes = labelMapping.get(label);
if (nodes == null) {
continue;
}
nodeTracker.updateNodesPerPartition(label, nodes);
}
}
private void addNode(RMNode nodeManager) { private void addNode(RMNode nodeManager) {
try { try {
writeLock.lock(); writeLock.lock();

View File

@ -45,6 +45,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AppPriorityACLConfigurationParser.AppPriorityACLKeyType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AppPriorityACLConfigurationParser.AppPriorityACLKeyType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.policy.PriorityUtilizationQueueOrderingPolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.policy.PriorityUtilizationQueueOrderingPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.policy.QueueOrderingPolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.policy.QueueOrderingPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.MultiNodeLookupPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.MultiNodePolicySpec;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FairOrderingPolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FairOrderingPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoOrderingPolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoOrderingPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy;
@ -2129,4 +2131,118 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
break; break;
} }
} }
@Private public static final String MULTI_NODE_SORTING_POLICIES =
PREFIX + "multi-node-sorting.policy.names";
@Private public static final String MULTI_NODE_SORTING_POLICY_NAME =
PREFIX + "multi-node-sorting.policy";
/**
* resource usage based node sorting algorithm.
*/
public static final String DEFAULT_NODE_SORTING_POLICY = "default";
public static final String DEFAULT_NODE_SORTING_POLICY_CLASSNAME
= "org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.ResourceUsageMultiNodeLookupPolicy";
public static final long DEFAULT_MULTI_NODE_SORTING_INTERVAL = 1000L;
@Private
public static final String MULTI_NODE_PLACEMENT_ENABLED = PREFIX
+ "multi-node-placement-enabled";
@Private
public static final boolean DEFAULT_MULTI_NODE_PLACEMENT_ENABLED = false;
public String getMultiNodesSortingAlgorithmPolicy(
String queue) {
String policyName = get(
getQueuePrefix(queue) + "multi-node-sorting.policy");
if (policyName == null) {
policyName = get(MULTI_NODE_SORTING_POLICY_NAME);
}
// If node sorting policy is not configured in queue and in cluster level,
// it is been assumed that this queue is not enabled with multi-node lookup.
if (policyName == null || policyName.isEmpty()) {
return null;
}
String policyClassName = get(MULTI_NODE_SORTING_POLICY_NAME + DOT
+ policyName.trim() + DOT + "class");
if (policyClassName == null || policyClassName.isEmpty()) {
throw new YarnRuntimeException(
policyName.trim() + " Class is not configured or not an instance of "
+ MultiNodeLookupPolicy.class.getCanonicalName());
}
return normalizePolicyName(policyClassName.trim());
}
public boolean getMultiNodePlacementEnabled() {
return getBoolean(MULTI_NODE_PLACEMENT_ENABLED,
DEFAULT_MULTI_NODE_PLACEMENT_ENABLED);
}
public Set<MultiNodePolicySpec> getMultiNodePlacementPolicies() {
String[] policies = getTrimmedStrings(MULTI_NODE_SORTING_POLICIES);
// In other cases, split the accessibleLabelStr by ","
Set<MultiNodePolicySpec> set = new HashSet<MultiNodePolicySpec>();
for (String str : policies) {
if (!str.trim().isEmpty()) {
String policyClassName = get(
MULTI_NODE_SORTING_POLICY_NAME + DOT + str.trim() + DOT + "class");
if (str.trim().equals(DEFAULT_NODE_SORTING_POLICY)) {
policyClassName = get(
MULTI_NODE_SORTING_POLICY_NAME + DOT + str.trim() + DOT + "class",
DEFAULT_NODE_SORTING_POLICY_CLASSNAME);
}
// This check is needed as default class name is loaded only for
// DEFAULT_NODE_SORTING_POLICY.
if (policyClassName == null) {
throw new YarnRuntimeException(
str.trim() + " Class is not configured or not an instance of "
+ MultiNodeLookupPolicy.class.getCanonicalName());
}
policyClassName = normalizePolicyName(policyClassName.trim());
long policySortingInterval = getLong(
MULTI_NODE_SORTING_POLICY_NAME + DOT + str.trim()
+ DOT + "sorting-interval.ms",
DEFAULT_MULTI_NODE_SORTING_INTERVAL);
if (policySortingInterval < 0) {
throw new YarnRuntimeException(
str.trim()
+ " multi-node policy is configured with invalid"
+ " sorting-interval:" + policySortingInterval);
}
set.add(
new MultiNodePolicySpec(policyClassName, policySortingInterval));
}
}
return Collections.unmodifiableSet(set);
}
private String normalizePolicyName(String policyName) {
// Ensure that custom node sorting algorithm class is valid.
try {
Class<?> nodeSortingPolicyClazz = getClassByName(policyName);
if (MultiNodeLookupPolicy.class
.isAssignableFrom(nodeSortingPolicyClazz)) {
return policyName;
} else {
throw new YarnRuntimeException(
"Class: " + policyName + " not instance of "
+ MultiNodeLookupPolicy.class.getCanonicalName());
}
} catch (ClassNotFoundException e) {
throw new YarnRuntimeException(
"Could not instantiate " + "NodesSortingPolicy: " + policyName, e);
}
}
} }

View File

@ -53,10 +53,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsMana
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.*; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.*;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityDiagnosticConstant; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityDiagnosticConstant;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesLogger; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesLogger;
@ -1036,23 +1032,24 @@ public class LeafQueue extends AbstractCSQueue {
private CSAssignment allocateFromReservedContainer(Resource clusterResource, private CSAssignment allocateFromReservedContainer(Resource clusterResource,
CandidateNodeSet<FiCaSchedulerNode> candidates, CandidateNodeSet<FiCaSchedulerNode> candidates,
ResourceLimits currentResourceLimits, SchedulingMode schedulingMode) { ResourceLimits currentResourceLimits, SchedulingMode schedulingMode) {
FiCaSchedulerNode node = CandidateNodeSetUtils.getSingleNode(candidates); // Considering multi-node scheduling, its better to iterate through
if (null == node) { // all candidates and stop once we get atleast one good node to allocate
return null; // where reservation was made earlier. In normal case, there is only one
} // node and hence there wont be any impact after this change.
for (FiCaSchedulerNode node : candidates.getAllNodes().values()) {
RMContainer reservedContainer = node.getReservedContainer();
if (reservedContainer != null) {
FiCaSchedulerApp application = getApplication(
reservedContainer.getApplicationAttemptId());
RMContainer reservedContainer = node.getReservedContainer(); if (null != application) {
if (reservedContainer != null) { ActivitiesLogger.APP.startAppAllocationRecording(activitiesManager,
FiCaSchedulerApp application = getApplication( node, SystemClock.getInstance().getTime(), application);
reservedContainer.getApplicationAttemptId()); CSAssignment assignment = application.assignContainers(
clusterResource, candidates, currentResourceLimits,
if (null != application) { schedulingMode, reservedContainer);
ActivitiesLogger.APP.startAppAllocationRecording(activitiesManager, return assignment;
node.getNodeID(), SystemClock.getInstance().getTime(), application); }
CSAssignment assignment = application.assignContainers(clusterResource,
candidates, currentResourceLimits, schedulingMode,
reservedContainer);
return assignment;
} }
} }
@ -1114,13 +1111,14 @@ public class LeafQueue extends AbstractCSQueue {
FiCaSchedulerApp application = assignmentIterator.next(); FiCaSchedulerApp application = assignmentIterator.next();
ActivitiesLogger.APP.startAppAllocationRecording(activitiesManager, ActivitiesLogger.APP.startAppAllocationRecording(activitiesManager,
node.getNodeID(), SystemClock.getInstance().getTime(), application); node, SystemClock.getInstance().getTime(), application);
// Check queue max-capacity limit // Check queue max-capacity limit
Resource appReserved = application.getCurrentReservation(); Resource appReserved = application.getCurrentReservation();
if (needAssignToQueueCheck) { if (needAssignToQueueCheck) {
if (!super.canAssignToThisQueue(clusterResource, node.getPartition(), if (!super.canAssignToThisQueue(clusterResource,
currentResourceLimits, appReserved, schedulingMode)) { candidates.getPartition(), currentResourceLimits, appReserved,
schedulingMode)) {
ActivitiesLogger.APP.recordRejectedAppActivityFromLeafQueue( ActivitiesLogger.APP.recordRejectedAppActivityFromLeafQueue(
activitiesManager, node, application, application.getPriority(), activitiesManager, node, application, application.getPriority(),
ActivityDiagnosticConstant.QUEUE_MAX_CAPACITY_LIMIT); ActivityDiagnosticConstant.QUEUE_MAX_CAPACITY_LIMIT);
@ -1155,7 +1153,8 @@ public class LeafQueue extends AbstractCSQueue {
userAssignable = false; userAssignable = false;
} else { } else {
userAssignable = canAssignToUser(clusterResource, application.getUser(), userAssignable = canAssignToUser(clusterResource, application.getUser(),
userLimit, application, node.getPartition(), currentResourceLimits); userLimit, application, candidates.getPartition(),
currentResourceLimits);
if (!userAssignable && Resources.fitsIn(cul.reservation, appReserved)) { if (!userAssignable && Resources.fitsIn(cul.reservation, appReserved)) {
cul.canAssign = false; cul.canAssign = false;
cul.reservation = appReserved; cul.reservation = appReserved;

View File

@ -553,8 +553,8 @@ public class ParentQueue extends AbstractCSQueue {
ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
getParentName(), getQueueName(), ActivityState.REJECTED, getParentName(), getQueueName(), ActivityState.REJECTED,
ActivityDiagnosticConstant.NOT_ABLE_TO_ACCESS_PARTITION + node ActivityDiagnosticConstant.NOT_ABLE_TO_ACCESS_PARTITION
.getPartition()); + candidates.getPartition());
if (rootQueue) { if (rootQueue) {
ActivitiesLogger.NODE.finishSkippedNodeAllocation(activitiesManager, ActivitiesLogger.NODE.finishSkippedNodeAllocation(activitiesManager,
node); node);

View File

@ -96,11 +96,10 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
* headroom, etc. * headroom, etc.
*/ */
private ContainerAllocation preCheckForNodeCandidateSet( private ContainerAllocation preCheckForNodeCandidateSet(
Resource clusterResource, CandidateNodeSet<FiCaSchedulerNode> candidates, Resource clusterResource, FiCaSchedulerNode node,
SchedulingMode schedulingMode, ResourceLimits resourceLimits, SchedulingMode schedulingMode, ResourceLimits resourceLimits,
SchedulerRequestKey schedulerKey) { SchedulerRequestKey schedulerKey) {
Priority priority = schedulerKey.getPriority(); Priority priority = schedulerKey.getPriority();
FiCaSchedulerNode node = CandidateNodeSetUtils.getSingleNode(candidates);
PendingAsk offswitchPendingAsk = application.getPendingAsk(schedulerKey, PendingAsk offswitchPendingAsk = application.getPendingAsk(schedulerKey,
ResourceRequest.ANY); ResourceRequest.ANY);
@ -164,7 +163,7 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
} }
if (!checkHeadroom(clusterResource, resourceLimits, required, if (!checkHeadroom(clusterResource, resourceLimits, required,
candidates.getPartition())) { node.getPartition())) {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("cannot allocate required resource=" + required LOG.debug("cannot allocate required resource=" + required
+ " because of headroom"); + " because of headroom");
@ -801,20 +800,6 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
// Do checks before determining which node to allocate // Do checks before determining which node to allocate
// Directly return if this check fails. // Directly return if this check fails.
ContainerAllocation result; ContainerAllocation result;
if (reservedContainer == null) {
result = preCheckForNodeCandidateSet(clusterResource, candidates,
schedulingMode, resourceLimits, schedulerKey);
if (null != result) {
return result;
}
} else {
// pre-check when allocating reserved container
if (application.getOutstandingAsksCount(schedulerKey) == 0) {
// Release
return new ContainerAllocation(reservedContainer, null,
AllocationState.QUEUE_SKIPPED);
}
}
AppPlacementAllocator<FiCaSchedulerNode> schedulingPS = AppPlacementAllocator<FiCaSchedulerNode> schedulingPS =
application.getAppSchedulingInfo().getAppPlacementAllocator( application.getAppSchedulingInfo().getAppPlacementAllocator(
@ -833,6 +818,22 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
while (iter.hasNext()) { while (iter.hasNext()) {
FiCaSchedulerNode node = iter.next(); FiCaSchedulerNode node = iter.next();
if (reservedContainer == null) {
result = preCheckForNodeCandidateSet(clusterResource, node,
schedulingMode, resourceLimits, schedulerKey);
if (null != result) {
continue;
}
} else {
// pre-check when allocating reserved container
if (application.getOutstandingAsksCount(schedulerKey) == 0) {
// Release
result = new ContainerAllocation(reservedContainer, null,
AllocationState.QUEUE_SKIPPED);
continue;
}
}
result = tryAllocateOnNode(clusterResource, node, schedulingMode, result = tryAllocateOnNode(clusterResource, node, schedulingMode,
resourceLimits, schedulerKey, reservedContainer); resourceLimits, schedulerKey, reservedContainer);

View File

@ -32,4 +32,8 @@ public class ApplicationSchedulingConfig {
@InterfaceAudience.Private @InterfaceAudience.Private
public static final Class<? extends AppPlacementAllocator> public static final Class<? extends AppPlacementAllocator>
DEFAULT_APPLICATION_PLACEMENT_TYPE_CLASS = LocalityAppPlacementAllocator.class; DEFAULT_APPLICATION_PLACEMENT_TYPE_CLASS = LocalityAppPlacementAllocator.class;
@InterfaceAudience.Private
public static final String ENV_MULTI_NODE_SORTING_POLICY_CLASS =
"MULTI_NODE_SORTING_POLICY_CLASS";
} }

View File

@ -75,6 +75,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCap
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator.AbstractContainerAllocator; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator.AbstractContainerAllocator;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator.ContainerAllocator; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator.ContainerAllocator;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ApplicationSchedulingConfig;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerAllocationProposal; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerAllocationProposal;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk;
@ -170,10 +171,32 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
rc = scheduler.getResourceCalculator(); rc = scheduler.getResourceCalculator();
} }
// Update multi-node sorting algorithm to scheduler envs
updateMultiNodeSortingPolicy(rmApp);
containerAllocator = new ContainerAllocator(this, rc, rmContext, containerAllocator = new ContainerAllocator(this, rc, rmContext,
activitiesManager); activitiesManager);
} }
private void updateMultiNodeSortingPolicy(RMApp rmApp) {
if (rmApp == null) {
return;
}
String queueName = null;
if (scheduler instanceof CapacityScheduler) {
queueName = getCSLeafQueue().getMultiNodeSortingPolicyName();
}
if (!appSchedulingInfo.getApplicationSchedulingEnvs().containsKey(
ApplicationSchedulingConfig.ENV_MULTI_NODE_SORTING_POLICY_CLASS)
&& queueName != null) {
appSchedulingInfo.getApplicationSchedulingEnvs().put(
ApplicationSchedulingConfig.ENV_MULTI_NODE_SORTING_POLICY_CLASS,
queueName);
}
}
public boolean containerCompleted(RMContainer rmContainer, public boolean containerCompleted(RMContainer rmContainer,
ContainerStatus containerStatus, RMContainerEventType event, ContainerStatus containerStatus, RMContainerEventType event,
String partition) { String partition) {

View File

@ -24,11 +24,14 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.SchedulingRequest; import org.apache.hadoop.yarn.api.records.SchedulingRequest;
import org.apache.hadoop.yarn.exceptions.SchedulerInvalidResoureRequestException; import org.apache.hadoop.yarn.exceptions.SchedulerInvalidResoureRequestException;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ApplicationSchedulingConfig;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
@ -55,6 +58,8 @@ public class LocalityAppPlacementAllocator <N extends SchedulerNode>
new ConcurrentHashMap<>(); new ConcurrentHashMap<>();
private volatile String primaryRequestedPartition = private volatile String primaryRequestedPartition =
RMNodeLabelsManager.NO_LABEL; RMNodeLabelsManager.NO_LABEL;
private MultiNodeSortingManager<N> multiNodeSortingManager = null;
private String multiNodeSortPolicyName;
private final ReentrantReadWriteLock.ReadLock readLock; private final ReentrantReadWriteLock.ReadLock readLock;
private final ReentrantReadWriteLock.WriteLock writeLock; private final ReentrantReadWriteLock.WriteLock writeLock;
@ -65,6 +70,26 @@ public class LocalityAppPlacementAllocator <N extends SchedulerNode>
writeLock = lock.writeLock(); writeLock = lock.writeLock();
} }
@SuppressWarnings("unchecked")
@Override
public void initialize(AppSchedulingInfo appSchedulingInfo,
SchedulerRequestKey schedulerRequestKey, RMContext rmContext) {
super.initialize(appSchedulingInfo, schedulerRequestKey, rmContext);
multiNodeSortPolicyName = appSchedulingInfo
.getApplicationSchedulingEnvs().get(
ApplicationSchedulingConfig.ENV_MULTI_NODE_SORTING_POLICY_CLASS);
multiNodeSortingManager = (MultiNodeSortingManager<N>) rmContext
.getMultiNodeSortingManager();
if (LOG.isDebugEnabled()) {
LOG.debug(
"nodeLookupPolicy used for " + appSchedulingInfo
.getApplicationId()
+ " is " + ((multiNodeSortPolicyName != null) ?
multiNodeSortPolicyName :
""));
}
}
@Override @Override
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public Iterator<N> getPreferredNodeIterator( public Iterator<N> getPreferredNodeIterator(
@ -74,11 +99,16 @@ public class LocalityAppPlacementAllocator <N extends SchedulerNode>
// in. // in.
N singleNode = CandidateNodeSetUtils.getSingleNode(candidateNodeSet); N singleNode = CandidateNodeSetUtils.getSingleNode(candidateNodeSet);
if (null != singleNode) { if (singleNode != null) {
return IteratorUtils.singletonIterator(singleNode); return IteratorUtils.singletonIterator(singleNode);
} }
return IteratorUtils.emptyIterator(); // singleNode will be null if Multi-node placement lookup is enabled, and
// hence could consider sorting policies.
return multiNodeSortingManager.getMultiNodeSortIterator(
candidateNodeSet.getAllNodes().values(),
candidateNodeSet.getPartition(),
multiNodeSortPolicyName);
} }
private boolean hasRequestLabelChanged(ResourceRequest requestOne, private boolean hasRequestLabelChanged(ResourceRequest requestOne,

View File

@ -0,0 +1,67 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import java.util.Collection;
import java.util.Iterator;
import java.util.Set;
/**
* <p>
* This class has the following functionality.
*
* <p>
* Provide an interface for MultiNodeLookupPolicy so that different placement
* allocator can choose nodes based on need.
* </p>
*/
public interface MultiNodeLookupPolicy<N extends SchedulerNode> {
/**
* Get iterator of preferred node depends on requirement and/or availability.
*
* @param nodes
* List of Nodes
* @param partition
* node label
*
* @return iterator of preferred node
*/
Iterator<N> getPreferredNodeIterator(Collection<N> nodes, String partition);
/**
* Refresh working nodes set for re-ordering based on the algorithm selected.
*
* @param nodes
* a collection working nm's.
*/
void addAndRefreshNodesSet(Collection<N> nodes, String partition);
/**
* Get sorted nodes per partition.
*
* @param partition
* node label
*
* @return collection of sorted nodes
*/
Set<N> getNodesPerPartition(String partition);
}

View File

@ -0,0 +1,56 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement;
/**
* MultiNodePolicySpec contains policyName and timeout.
*/
public class MultiNodePolicySpec {
private String policyName;
private long sortingInterval;
public MultiNodePolicySpec(String policyName, long timeout) {
this.setSortingInterval(timeout);
this.setPolicyName(policyName);
}
public long getSortingInterval() {
return sortingInterval;
}
public void setSortingInterval(long timeout) {
this.sortingInterval = timeout;
}
public String getPolicyName() {
return policyName;
}
public void setPolicyName(String policyName) {
this.policyName = policyName;
}
@Override
public String toString() {
return "MultiNodePolicySpec {" +
"policyName='" + policyName + '\'' +
", sortingInterval=" + sortingInterval +
'}';
}
}

View File

@ -0,0 +1,167 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import com.google.common.annotations.VisibleForTesting;
/**
* Common node sorting class which will do sorting based on policy spec.
* @param <N> extends SchedulerNode.
*/
public class MultiNodeSorter<N extends SchedulerNode> extends AbstractService {
private MultiNodeLookupPolicy<N> multiNodePolicy;
private static final Log LOG = LogFactory.getLog(MultiNodeSorter.class);
// ScheduledExecutorService which schedules the PreemptionChecker to run
// periodically.
private ScheduledExecutorService ses;
private ScheduledFuture<?> handler;
private volatile boolean stopped;
private RMContext rmContext;
private MultiNodePolicySpec policySpec;
public MultiNodeSorter(RMContext rmContext,
MultiNodePolicySpec policy) {
super("MultiNodeLookupPolicy");
this.rmContext = rmContext;
this.policySpec = policy;
}
@VisibleForTesting
public synchronized MultiNodeLookupPolicy<N> getMultiNodeLookupPolicy() {
return multiNodePolicy;
}
public void serviceInit(Configuration conf) throws Exception {
LOG.info("Initializing MultiNodeSorter=" + policySpec.getPolicyName()
+ ", with sorting interval=" + policySpec.getSortingInterval());
initPolicy(policySpec.getPolicyName());
super.serviceInit(conf);
}
@SuppressWarnings("unchecked")
void initPolicy(String policyName) throws YarnException {
Class<?> policyClass;
try {
policyClass = Class.forName(policyName);
} catch (ClassNotFoundException e) {
throw new YarnException(
"Invalid policy name:" + policyName + e.getMessage());
}
this.multiNodePolicy = (MultiNodeLookupPolicy<N>) ReflectionUtils
.newInstance(policyClass, null);
}
@Override
public void serviceStart() throws Exception {
LOG.info("Starting SchedulingMonitor=" + getName());
assert !stopped : "starting when already stopped";
ses = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName(getName());
return t;
}
});
// Start sorter thread only if sorting interval is a +ve value.
if(policySpec.getSortingInterval() != 0) {
handler = ses.scheduleAtFixedRate(new SortingThread(),
0, policySpec.getSortingInterval(), TimeUnit.MILLISECONDS);
}
super.serviceStart();
}
@Override
public void serviceStop() throws Exception {
stopped = true;
if (handler != null) {
LOG.info("Stop " + getName());
handler.cancel(true);
ses.shutdown();
}
super.serviceStop();
}
@SuppressWarnings("unchecked")
@VisibleForTesting
public void reSortClusterNodes() {
Set<String> nodeLabels = new HashSet<>();
nodeLabels
.addAll(rmContext.getNodeLabelManager().getClusterNodeLabelNames());
nodeLabels.add(RMNodeLabelsManager.NO_LABEL);
for (String label : nodeLabels) {
Map<NodeId, SchedulerNode> nodesByPartition = new HashMap<>();
List<SchedulerNode> nodes = ((AbstractYarnScheduler) rmContext
.getScheduler()).getNodeTracker().getNodesPerPartition(label);
if (nodes != null && !nodes.isEmpty()) {
nodes.forEach(n -> nodesByPartition.put(n.getNodeID(), n));
multiNodePolicy.addAndRefreshNodesSet(
(Collection<N>) nodesByPartition.values(), label);
}
}
}
private class SortingThread implements Runnable {
@Override
public void run() {
try {
reSortClusterNodes();
} catch (Throwable t) {
// The preemption monitor does not alter structures nor do structures
// persist across invocations. Therefore, log, skip, and retry.
LOG.error("Exception raised while executing multinode"
+ " sorter, skip this run..., exception=", t);
}
}
}
/**
* Verify whether sorter thread is running or not.
*
* @return true if sorter thread is running, false otherwise.
*/
public boolean isSorterThreadRunning() {
return (handler != null);
}
}

View File

@ -0,0 +1,139 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.collections.IteratorUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
/**
* Node Sorting Manager which runs all sorter threads and policies.
* @param <N> extends SchedulerNode
*/
public class MultiNodeSortingManager<N extends SchedulerNode>
extends AbstractService {
private static final Log LOG = LogFactory
.getLog(MultiNodeSortingManager.class);
private RMContext rmContext;
private Map<String, MultiNodeSorter<N>> runningMultiNodeSorters;
private Set<MultiNodePolicySpec> policySpecs = new HashSet<MultiNodePolicySpec>();
private Configuration conf;
private boolean multiNodePlacementEnabled;
public MultiNodeSortingManager() {
super("MultiNodeSortingManager");
this.runningMultiNodeSorters = new ConcurrentHashMap<>();
}
@Override
public void serviceInit(Configuration configuration) throws Exception {
LOG.info("Initializing NodeSortingService=" + getName());
super.serviceInit(configuration);
this.conf = configuration;
}
@Override
public void serviceStart() throws Exception {
LOG.info("Starting NodeSortingService=" + getName());
createAllPolicies();
super.serviceStart();
}
@Override
public void serviceStop() throws Exception {
for (MultiNodeSorter<N> sorter : runningMultiNodeSorters.values()) {
sorter.stop();
}
super.serviceStop();
}
private void createAllPolicies() {
if (!multiNodePlacementEnabled) {
return;
}
for (MultiNodePolicySpec policy : policySpecs) {
MultiNodeSorter<N> mon = new MultiNodeSorter<N>(rmContext, policy);
mon.init(conf);
mon.start();
runningMultiNodeSorters.put(policy.getPolicyName(), mon);
}
}
public MultiNodeSorter<N> getMultiNodePolicy(String name) {
return runningMultiNodeSorters.get(name);
}
public void setRMContext(RMContext context) {
this.rmContext = context;
}
public void registerMultiNodePolicyNames(
boolean isMultiNodePlacementEnabled,
Set<MultiNodePolicySpec> multiNodePlacementPolicies) {
this.policySpecs.addAll(multiNodePlacementPolicies);
this.multiNodePlacementEnabled = isMultiNodePlacementEnabled;
LOG.info("MultiNode scheduling is '" + multiNodePlacementEnabled +
"', and configured policies are " + StringUtils
.join(policySpecs.iterator(), ","));
}
public Iterator<N> getMultiNodeSortIterator(Collection<N> nodes,
String partition, String policyName) {
// nodeLookupPolicy can be null if app is configured with invalid policy.
// in such cases, use the the first node.
if(policyName == null) {
LOG.warn("Multi Node scheduling is enabled, however invalid class is"
+ " configured. Valid sorting policy has to be configured in"
+ " yarn.scheduler.capacity.<queue>.multi-node-sorting.policy");
return IteratorUtils.singletonIterator(
nodes.iterator().next());
}
MultiNodeSorter multiNodeSorter = getMultiNodePolicy(policyName);
if (multiNodeSorter == null) {
LOG.warn(
"MultiNode policy '" + policyName + "' is configured, however " +
"yarn.scheduler.capacity.multi-node-placement-enabled is false");
return IteratorUtils.singletonIterator(
nodes.iterator().next());
}
MultiNodeLookupPolicy<N> policy = multiNodeSorter
.getMultiNodeLookupPolicy();
// If sorter thread is not running, refresh node set.
if (!multiNodeSorter.isSorterThreadRunning()) {
policy.addAndRefreshNodesSet(nodes, partition);
}
return policy.getPreferredNodeIterator(nodes, partition);
}
}

View File

@ -0,0 +1,79 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import java.util.Comparator;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
/**
* <p>
* This class has the following functionality:
*
* <p>
* ResourceUsageMultiNodeLookupPolicy holds sorted nodes list based on the
* resource usage of nodes at given time.
* </p>
*/
public class ResourceUsageMultiNodeLookupPolicy<N extends SchedulerNode>
implements MultiNodeLookupPolicy<N> {
protected Map<String, Set<N>> nodesPerPartition = new ConcurrentHashMap<>();
protected Comparator<N> comparator;
public ResourceUsageMultiNodeLookupPolicy() {
this.comparator = new Comparator<N>() {
@Override
public int compare(N o1, N o2) {
int allocatedDiff = o1.getAllocatedResource()
.compareTo(o2.getAllocatedResource());
if (allocatedDiff == 0) {
return o1.getNodeID().compareTo(o2.getNodeID());
}
return allocatedDiff;
}
};
}
@Override
public Iterator<N> getPreferredNodeIterator(Collection<N> nodes,
String partition) {
return getNodesPerPartition(partition).iterator();
}
@Override
public void addAndRefreshNodesSet(Collection<N> nodes,
String partition) {
Set<N> nodeList = new ConcurrentSkipListSet<N>(comparator);
nodeList.addAll(nodes);
nodesPerPartition.put(partition, Collections.unmodifiableSet(nodeList));
}
@Override
public Set<N> getNodesPerPartition(String partition) {
return nodesPerPartition.getOrDefault(partition, Collections.emptySet());
}
}

View File

@ -56,6 +56,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.Capacity
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.MultiNodeSortingManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
@ -295,6 +296,8 @@ public class ReservationSystemTestUtil {
}); });
mockRmContext.setNodeLabelManager(nlm); mockRmContext.setNodeLabelManager(nlm);
mockRmContext
.setMultiNodeSortingManager(mock(MultiNodeSortingManager.class));
return mockRmContext; return mockRmContext;
} }

View File

@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSLeafQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSLeafQueue;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
@ -118,7 +119,7 @@ public class TestAppSchedulingInfo {
doReturn(mock(QueueMetrics.class)).when(queue).getMetrics(); doReturn(mock(QueueMetrics.class)).when(queue).getMetrics();
AppSchedulingInfo info = new AppSchedulingInfo( AppSchedulingInfo info = new AppSchedulingInfo(
appAttemptId, "test", queue, mock(ActiveUsersManager.class), 0, appAttemptId, "test", queue, mock(ActiveUsersManager.class), 0,
new ResourceUsage(), new HashMap<>(), null); new ResourceUsage(), new HashMap<>(), mock(RMContext.class));
Assert.assertEquals(0, info.getSchedulerKeys().size()); Assert.assertEquals(0, info.getSchedulerKeys().size());
Priority pri1 = Priority.newInstance(1); Priority pri1 = Priority.newInstance(1);

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.junit.Assert; import org.junit.Assert;
import java.util.Set; import java.util.Set;
@ -76,4 +77,16 @@ public class CapacitySchedulerTestBase {
.getPending(label == null ? RMNodeLabelsManager.NO_LABEL : label) .getPending(label == null ? RMNodeLabelsManager.NO_LABEL : label)
.getMemorySize() > 0); .getMemorySize() > 0);
} }
protected void waitforNMRegistered(ResourceScheduler scheduler, int nodecount,
int timesec) throws InterruptedException {
long start = System.currentTimeMillis();
while (System.currentTimeMillis() - start < timesec * 1000) {
if (scheduler.getNumClusterNodes() < nodecount) {
Thread.sleep(100);
} else {
break;
}
}
}
} }

View File

@ -106,8 +106,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MyConta
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.placement
.UserGroupMappingPlacementRule;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics;
@ -172,7 +170,6 @@ import org.mockito.Mockito;
import com.google.common.base.Supplier; import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import org.mockito.invocation.InvocationOnMock; import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer; import org.mockito.stubbing.Answer;
@ -4871,18 +4868,6 @@ public class TestCapacityScheduler extends CapacitySchedulerTestBase {
return cs; return cs;
} }
private void waitforNMRegistered(ResourceScheduler scheduler, int nodecount,
int timesec) throws InterruptedException {
long start = System.currentTimeMillis();
while (System.currentTimeMillis() - start < timesec * 1000) {
if (scheduler.getNumClusterNodes() < nodecount) {
Thread.sleep(100);
} else {
break;
}
}
}
@Test (timeout = 60000) @Test (timeout = 60000)
public void testClearRequestsBeforeApplyTheProposal() public void testClearRequestsBeforeApplyTheProposal()
throws Exception { throws Exception {

View File

@ -0,0 +1,166 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.MultiNodeSorter;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.MultiNodeSortingManager;
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
/**
* Test class for Multi Node scheduling related tests.
*/
public class TestCapacitySchedulerMultiNodes extends CapacitySchedulerTestBase {
private static final Log LOG = LogFactory
.getLog(TestCapacitySchedulerMultiNodes.class);
private CapacitySchedulerConfiguration conf;
private static final String POLICY_CLASS_NAME =
"org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.ResourceUsageMultiNodeLookupPolicy";
@Before
public void setUp() {
CapacitySchedulerConfiguration config =
new CapacitySchedulerConfiguration();
config.set(CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS,
DominantResourceCalculator.class.getName());
conf = new CapacitySchedulerConfiguration(config);
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
conf.set(CapacitySchedulerConfiguration.MULTI_NODE_SORTING_POLICIES,
"resource-based");
conf.set(CapacitySchedulerConfiguration.MULTI_NODE_SORTING_POLICY_NAME,
"resource-based");
String policyName =
CapacitySchedulerConfiguration.MULTI_NODE_SORTING_POLICY_NAME
+ ".resource-based" + ".class";
conf.set(policyName, POLICY_CLASS_NAME);
conf.setBoolean(CapacitySchedulerConfiguration.MULTI_NODE_PLACEMENT_ENABLED,
true);
conf.setInt("yarn.scheduler.minimum-allocation-mb", 512);
conf.setInt("yarn.scheduler.minimum-allocation-vcores", 1);
}
@Test
public void testMultiNodeSorterForScheduling() throws Exception {
MockRM rm = new MockRM(conf);
rm.start();
rm.registerNode("127.0.0.1:1234", 10 * GB);
rm.registerNode("127.0.0.1:1235", 10 * GB);
rm.registerNode("127.0.0.1:1236", 10 * GB);
rm.registerNode("127.0.0.1:1237", 10 * GB);
ResourceScheduler scheduler = rm.getRMContext().getScheduler();
waitforNMRegistered(scheduler, 4, 5);
MultiNodeSortingManager<SchedulerNode> mns = rm.getRMContext()
.getMultiNodeSortingManager();
MultiNodeSorter<SchedulerNode> sorter = mns
.getMultiNodePolicy(POLICY_CLASS_NAME);
sorter.reSortClusterNodes();
Set<SchedulerNode> nodes = sorter.getMultiNodeLookupPolicy()
.getNodesPerPartition("");
Assert.assertEquals(4, nodes.size());
rm.stop();
}
@Test
public void testMultiNodeSorterForSchedulingWithOrdering() throws Exception {
MockRM rm = new MockRM(conf);
rm.start();
MockNM nm1 = rm.registerNode("127.0.0.1:1234", 10 * GB, 10);
MockNM nm2 = rm.registerNode("127.0.0.2:1235", 10 * GB, 10);
MockNM nm3 = rm.registerNode("127.0.0.3:1236", 10 * GB, 10);
MockNM nm4 = rm.registerNode("127.0.0.4:1237", 10 * GB, 10);
ResourceScheduler scheduler = rm.getRMContext().getScheduler();
waitforNMRegistered(scheduler, 4, 5);
MultiNodeSortingManager<SchedulerNode> mns = rm.getRMContext()
.getMultiNodeSortingManager();
MultiNodeSorter<SchedulerNode> sorter = mns
.getMultiNodePolicy(POLICY_CLASS_NAME);
sorter.reSortClusterNodes();
Set<SchedulerNode> nodes = sorter.getMultiNodeLookupPolicy()
.getNodesPerPartition("");
Assert.assertEquals(4, nodes.size());
RMApp app1 = rm.submitApp(2048, "app-1", "user1", null, "default");
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);
SchedulerNodeReport reportNm1 =
rm.getResourceScheduler().getNodeReport(nm1.getNodeId());
// check node report
Assert.assertEquals(2 * GB, reportNm1.getUsedResource().getMemorySize());
Assert.assertEquals(8 * GB,
reportNm1.getAvailableResource().getMemorySize());
// Ideally thread will invoke this, but thread operates every 1sec.
// Hence forcefully recompute nodes.
sorter.reSortClusterNodes();
RMApp app2 = rm.submitApp(1024, "app-2", "user2", null, "default");
MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm2);
SchedulerNodeReport reportNm2 =
rm.getResourceScheduler().getNodeReport(nm2.getNodeId());
// check node report
Assert.assertEquals(1 * GB, reportNm2.getUsedResource().getMemorySize());
Assert.assertEquals(9 * GB,
reportNm2.getAvailableResource().getMemorySize());
// Ideally thread will invoke this, but thread operates every 1sec.
// Hence forcefully recompute nodes.
sorter.reSortClusterNodes();
// Node1 and Node2 are now having used resources. Hence ensure these 2 comes
// latter in the list.
nodes = sorter.getMultiNodeLookupPolicy()
.getNodesPerPartition("");
List<NodeId> currentNodes = new ArrayList<>();
currentNodes.add(nm3.getNodeId());
currentNodes.add(nm4.getNodeId());
currentNodes.add(nm2.getNodeId());
currentNodes.add(nm1.getNodeId());
Iterator<SchedulerNode> it = nodes.iterator();
SchedulerNode current;
int i = 0;
while (it.hasNext()) {
current = it.next();
Assert.assertEquals(current.getNodeID(), currentNodes.get(i++));
}
rm.stop();
}
}

View File

@ -817,4 +817,74 @@ public class TestCapacitySchedulerNodeLabelUpdate {
} }
return memorySize; return memorySize;
} }
private long waitForNodeLabelSchedulerEventUpdate(MockRM rm, String partition,
long expectedNodeCount, long timeout) throws InterruptedException {
long start = System.currentTimeMillis();
long size = 0;
while (System.currentTimeMillis() - start < timeout) {
CapacityScheduler scheduler = (CapacityScheduler) rm
.getResourceScheduler();
size = scheduler.getNodeTracker().getNodesPerPartition(partition).size();
if (size == expectedNodeCount) {
return size;
}
Thread.sleep(100);
}
return size;
}
@Test
public void testNodeCountBasedOnNodeLabelsFromClusterNodeTracker()
throws Exception {
// set node -> label
mgr.addToCluserNodeLabelsWithDefaultExclusivity(
ImmutableSet.of("x", "y", "z"));
// set mapping:
// h1 -> x
// h2 -> y
mgr.addLabelsToNode(
ImmutableMap.of(NodeId.newInstance("h1", 1234), toSet("x")));
mgr.addLabelsToNode(
ImmutableMap.of(NodeId.newInstance("h2", 1234), toSet("x")));
// inject node label manager
MockRM rm = new MockRM(getConfigurationWithQueueLabels(conf)) {
@Override
public RMNodeLabelsManager createNodeLabelManager() {
return mgr;
}
};
rm.getRMContext().setNodeLabelManager(mgr);
rm.start();
MockNM nm1 = rm.registerNode("h1:1234", 8000);
rm.registerNode("h2:1234", 8000);
rm.registerNode("h3:1234", 8000);
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
// Ensure that cluster node tracker is updated with correct set of node
// after Node registration.
Assert.assertEquals(2,
cs.getNodeTracker().getNodesPerPartition("x").size());
Assert.assertEquals(1, cs.getNodeTracker().getNodesPerPartition("").size());
rm.unRegisterNode(nm1);
rm.registerNode("h4:1234", 8000);
// Ensure that cluster node tracker is updated with correct set of node
// after new Node registration and old node label change.
Assert.assertEquals(1,
cs.getNodeTracker().getNodesPerPartition("x").size());
Assert.assertEquals(2, cs.getNodeTracker().getNodesPerPartition("").size());
mgr.replaceLabelsOnNode(
ImmutableMap.of(NodeId.newInstance("h2", 1234), toSet("")));
// Last node with label x is replaced by CLI or REST.
Assert.assertEquals(0,
waitForNodeLabelSchedulerEventUpdate(rm, "x", 0, 3000L));
}
} }